修改日期 | 修改人 | 备注 |
2020-02-20 21:42:35[当前版本] | 陈胜涛 | 创建版本 |
2020-02-20 21:40:53 | 陈胜涛 | 创建版本 |
整个连接过程,我们从抓包结果来分析,只关注AMQP协议即可
第一步客户端发送Protocol-Hearder
服务端响应Connection.start
客户端发送Connection.Start-Ok
服务端发送Connection.Tune
客户端发送Connection.Tune-Ok
客户端发送Connection.Open
服务端发送Connection.Open-Ok
创建连接的工厂ConnectionFactory,包含创建连接的一些必备参数。
核心方法newConnection()
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName) throws IOException, TimeoutException { if(this.metricsCollector == null) { //这是一个空的收集,没有任何操作 this.metricsCollector = new NoOpMetricsCollector(); } // make sure we respect the provided thread factory //根据参数创建FrameHandlerFactory,BIO和NIO FrameHandlerFactory fhFactory = createFrameHandlerFactory(); //此executor是用于消费者消费使用的(consumerWorkServiceExecutor) ConnectionParams params = params(executor); // set client-provided via a client property if (clientProvidedName != null) { Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties()); properties.put("connection_name", clientProvidedName); params.setClientProperties(properties); } //默认自动恢复连接 if (isAutomaticRecoveryEnabled()) { // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector); conn.init(); return conn; } else { List<Address> addrs = addressResolver.getAddresses(); Exception lastException = null; for (Address addr : addrs) { try { FrameHandler handler = fhFactory.create(addr); AMQConnection conn = createConnection(params, handler, metricsCollector); conn.start(); this.metricsCollector.newConnection(conn); return conn; } catch (IOException e) { lastException = e; } catch (TimeoutException te) { lastException = te; } } if (lastException != null) { if (lastException instanceof IOException) { throw (IOException) lastException; } else if (lastException instanceof TimeoutException) { throw (TimeoutException) lastException; } } throw new IOException("failed to connect"); } }
createFrameHandlerFactory,FrameHandlerFactory
protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException { if(nio) { if(this.frameHandlerFactory == null) { if(this.nioParams.getNioExecutor() == null && this.nioParams.getThreadFactory() == null) { this.nioParams.setThreadFactory(getThreadFactory()); } this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, nioParams, isSSL(), sslContext); } return this.frameHandlerFactory; } else { return new SocketFrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL(), this.shutdownExecutor); } }
AMQConnection详细注释
起来启动的核心代码
/** * Start up the connection, including the MainLoop thread. * Sends the protocol * version negotiation header, and runs through * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then * calls Connection.Open and waits for the OpenOk. Sets heart-beat * and frame max values after tuning has taken place. * @throws IOException if an error is encountered * either before, or during, protocol negotiation; * sub-classes {@link ProtocolVersionMismatchException} and * {@link PossibleAuthenticationFailureException} will be thrown in the * corresponding circumstances. {@link AuthenticationFailureException} * will be thrown if the broker closes the connection with ACCESS_REFUSED. * If an exception is thrown, connection resources allocated can all be * garbage collected when the connection object is no longer referenced. * * * client收到Connection.Tune方法后,必须要开始发送心跳, * 并在收到Connection.Open后,必须要开始监控.server在收到Connection.Tune-Ok后, * 需要开始发送和监控心跳. */ public void start() throws IOException, TimeoutException { //创建Consumer服务 initializeConsumerWorkService(); //创建长连接心跳 initializeHeartbeatSender(); //判断主循环是否在运行中 this._running = true; // Make sure that the first thing we do is to send the header, // which should cause any socket errors to show up for us, rather // than risking them pop out in the MainLoop /** * 先发送header,确保socket是否会发生错误,比在MainLoop(主事件循环)去确保要好。 */ AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation(); // We enqueue an RPC continuation here without sending an RPC // request, since the protocol specifies that after sending // the version negotiation header, the client (connection // initiator) is to wait for a connection.start method to // arrive. _channel0.enqueueRpc(connStartBlocker); try { // The following two lines are akin to AMQChannel's // transmit() method for this pseudo-RPC. _frameHandler.setTimeout(handshakeTimeout); //发送一个协议头开始新的连接,格式为'AMQP0091' _frameHandler.sendHeader(); } catch (IOException ioe) { _frameHandler.close(); throw ioe; } /** * 此处就是启动MainLoop(源码是 connection.startMainLoop()) * 把连接启动放入到framehandler的 initialize()方法中,这样的设计是否合理?? */ this._frameHandler.initialize(this); AMQP.Connection.Start connStart; AMQP.Connection.Tune connTune = null; try { connStart = (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod(); _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties()); Version serverVersion = new Version(connStart.getVersionMajor(), connStart.getVersionMinor()); if (!Version.checkVersion(clientVersion, serverVersion)) { throw new ProtocolVersionMismatchException(clientVersion, serverVersion); } //mechanisms(机制),返回的数据形如:AMQPLAIN PLAIN String[] mechanisms = connStart.getMechanisms().toString().split(" "); SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms); if (sm == null) { throw new IOException("No compatible authentication mechanism found - " + "server offered [" + connStart.getMechanisms() + "]"); } LongString challenge = null; LongString response = sm.handleChallenge(null, this.username, this.password); do { //构建Start-OK(认证机制) Method method = (challenge == null) ? new AMQP.Connection.StartOk.Builder() .clientProperties(_clientProperties) .mechanism(sm.getName()) .response(response) .build() : new AMQP.Connection.SecureOk.Builder().response(response).build(); try { Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod(); if (serverResponse instanceof AMQP.Connection.Tune) { connTune = (AMQP.Connection.Tune) serverResponse; } else { challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge(); response = sm.handleChallenge(challenge, this.username, this.password); } } catch (ShutdownSignalException e) { Method shutdownMethod = e.getReason(); if (shutdownMethod instanceof AMQP.Connection.Close) { AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod; if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) { throw new AuthenticationFailureException(shutdownClose.getReplyText()); } } throw new PossibleAuthenticationFailureException(e); } } while (connTune == null); } catch (TimeoutException te) { _frameHandler.close(); throw te; } catch (ShutdownSignalException sse) { _frameHandler.close(); throw AMQChannel.wrap(sse); } catch(IOException ioe) { _frameHandler.close(); throw ioe; } try { //协商通道最大编号,协商规则如下 // (clientValue == 0 || serverValue == 0) ?Math.max(clientValue, serverValue) :Math.min(clientValue, serverValue); int channelMax = negotiateChannelMax(this.requestedChannelMax, connTune.getChannelMax()); //创建通道管理器 _channelManager = instantiateChannelManager(channelMax, threadFactory); //协商Frame的最大长度 int frameMax = negotiatedMaxValue(this.requestedFrameMax, connTune.getFrameMax()); this._frameMax = frameMax; //协商出心跳时间 int heartbeat = negotiatedMaxValue(this.requestedHeartbeat, connTune.getHeartbeat()); //发送心跳 setHeartbeat(heartbeat); //发送TuneOk _channel0.transmit(new AMQP.Connection.TuneOk.Builder() .channelMax(channelMax) .frameMax(frameMax) .heartbeat(heartbeat) .build()); //发送Open打开连接 _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder() .virtualHost(_virtualHost) .build()); } catch (IOException ioe) { _heartbeatSender.shutdown(); _frameHandler.close(); throw ioe; } catch (ShutdownSignalException sse) { _heartbeatSender.shutdown(); _frameHandler.close(); throw AMQChannel.wrap(sse); } // We can now respond to errors having finished tailoring the connection this._inConnectionNegotiation = false; }
jannals
发布了137 篇原创文章 · 获赞 22 · 访问量 10万+
私信 关注
展开阅读全文
发表评论
添加代码片
还能输入1000个字符
[一]RabbitMQ-客户端源码之ConnectionFactory 04-17 阅读数 56
首先看一段amqp-client发送端的示例代码(展示出主要部分):ConnectionFactory factory = new ConnectionFactory();factory.setHos... 博文 来自: weixin_33725722的博客
RabbitMQ知识整合及探究 02-14 阅读数 1万+
RabbitMQ知识整合及探究1、总体介绍1.1 RabbitMQ是一个由Erlang语言编写的AMQP协议的开源实现.1.2 RabbitMQ是信息传输的中间者.本质上,他从生产者(producer... 博文 来自: lsxf_xin的专栏
RabbitMQ-c学习和开发client经验分享 09-07 阅读数 4885
RabbitMQ-c学习和开发client经验分享最近在学习RabbitMQ,并且需要开发一个client,向RabbitMQ server发送日志消息。记录其中遇到的坑。我使用的开发语言是C++。俗... 博文 来自: CODEING的专栏
130 个相见恨晚的超实用网站,一次性分享出来 01-06 阅读数 22万+
相见恨晚的超实用网站持续更新中。。。 博文 来自: 藏冰的博客
SpringAmqp之connectionFactory 11-02 阅读数 966
文章目录版本说明问题产生解决方案模拟内部实现CachingConnectionFactory原文引用对象说明多ConnectionFactory配置生产者执行效果消费者执行效果版本说明 &... 博文 来自: kulolo的博客
[二]RabbitMQ-客户端源码之AMQConnection 04-17 阅读数 27
上一篇文章([一]RabbitMQ-客户端源码之ConnectionFactory)中阐述了conn.start()方法完成之后客户端就已经和broker建立了正常的连接,而这个Connection的... 博文 来自: weixin_34050005的博客
《奇巧淫技》系列-python!!每天早上八点自动发送天气预报邮件到QQ邮箱 01-19 阅读数 8万+
将代码部署服务器,每日早上定时获取到天气数据,并发送到邮箱。也可以说是一个小人工智障。思路可以运用在不同地方,主要介绍的是思路。... 博文 来自: SunriseCai的博客
字节跳动视频编解码面经 11-20 阅读数 12万+
三四月份投了字节跳动的实习(图形图像岗位),然后hr打电话过来问了一下会不会opengl,c++,shador,当时只会一点c++,其他两个都不会,也就直接被拒了。七月初内推了字节跳动的提前批,因为内... 博文 来自: ljh_shuai的博客
rabbitmq源码分析(一) 04-06 阅读数 3143
因为学习erlang,下载了传说中的rabbitmq,这个是工业级,学习一下还是有必要的。看看大师们如何用erlang,下载地址:http://www.rabbitmq.com/server.html... 博文 来自: yuanhailu01的专栏
RabbitMQ客户端源码分析(八)之NIO 10-28 阅读数 373
文章目录NioLoopContextNioLoopSelectorHolderSocketChannelFrameHandlerStateNioLoopContext主要用于NIO事件循环的管理,根据... 博文 来自: jannal专栏
RabbitMQ Connection Channel 详解 04-07 阅读数 1775
首先展示网络上的两种图:AMQP :Rabbit各关键组件交换流程:Rabbit 内部线程图:1.ConnectionFactory、Connection、ChannelConnectionFacto... 博文 来自: BIGBAI的博客
程序员成长的四个简单技巧,你 get 了吗? 10-23 阅读数 2万+
最近拜读了“阿里工程师的自我修养”手册,12 位技术专家分享生涯感悟来帮助我们这些菜鸡更好的成长,度过中年危机,我收获颇多,其中有不少的方法技巧和我正在使用的,这让我觉得我做的这些事情是对的,我走在了... 博文 来自: 平头哥的技术博文
RabbitMQ客户端源码分析(七)之Channel与ChannelManager 10-20 阅读数 249
文章目录RabbitMQ-java-client版本ChannelChannelManagerRabbitMQ-java-client版本com.rabbitmq:amqp-client:4.3.0R... 博文 来自: jannal专栏
Erlang:RabbitMQ源码分析 1. 启动过程 08-24 阅读数 4285
RabbitMQ源码分析 1. 启动过程 博文 来自: liaosongbo的专栏
我花了一夜用数据结构给女朋友写个H5走迷宫游戏 09-21 阅读数 38万+
起因又到深夜了,我按照以往在csdn和公众号写着数据结构!这占用了我大量的时间!我的超越妹妹严重缺乏陪伴而 怨气满满!而女朋友时常埋怨,认为数据结构这么抽象难懂的东西没啥作用,常会问道:天天写这玩意,... 博文 来自: bigsai
Python——画一棵漂亮的樱花树(不同种樱花+玫瑰+圣诞树喔) 10-22 阅读数 20万+
最近翻到一篇知乎,上面有不少用Python(大多是turtle库)绘制的树图,感觉很漂亮,我整理了一下,挑了一些我觉得不错的代码分享给大家(这些我都测试过,确实可以生成)one 樱花树 动态生成樱花效... 博文 来自: 碎片
如何优化MySQL千万级大表,我写了6000字的解读 10-21 阅读数 4万+
这是学习笔记的第2138篇文章千万级大表如何优化,这是一个很有技术含量的问题,通常我们的直觉思维都会跳转到拆分或者数据分区,在此我想做一些补充和梳理,想和大家做一些这方面的经验总结,也欢迎大家提出建议... 博文 来自: 杨建荣的学习笔记
RabbitMQ设置SSL相关操作 11-12 阅读数 2431
相关文件openssl.cnf 文件配置[ ca ] default_ca = testca ... 博文 来自: 一只没有脚的鸟
Java学习的正确打开方式 01-08 阅读数 20万+
在博主认为,对于入门级学习java的最佳学习方法莫过于视频+博客+书籍+总结,前三者博主将淋漓尽致地挥毫于这篇博客文章中,至于总结在于个人,实际上越到后面你会发现学习的最好方式就是阅读参考官方文档其次... 博文 来自: 程序员宜春的博客
RabbitMQ客户端源码分析(二)之Frame与FrameHandler 10-14 阅读数 273
文章目录版本声明Frame(帧)分析AMQP帧(Frame)的格式Frame源码分析FrameHandlerSocketFrameHandlerSocketChannelFrameHandlerFra... 博文 来自: jannal专栏
01.JDBC的高级使用——ConnectionFactory 04-25 阅读数 926
JDBC的高级使用——ConnectionFactoryConnectionFactory:连接工厂ConnectionFactory的作用利用工厂模式提升代码的重用性封装注册数据库的驱动和获得数据库... 博文 来自: 远方水木
RabbitMQ配置详解 09-16 阅读数 2117
**一,前期准备** 准备安装好的rabbit环境和账号信息,如下:#rabbitmqrabbit.server.host=192.168.*.*rabbit.server.port=5672rabb... 博文 来自: qq_34683450的博客
项目中的if else太多了,该怎么重构? 11-11 阅读数 14万+
介绍最近跟着公司的大佬开发了一款IM系统,类似QQ和微信哈,就是聊天软件。我们有一部分业务逻辑是这样的if (msgType = "文本") { // dosomething} else if(msg... 博文
连接rabbitmq 报错 com.rabbitmq.client.ShutdownSignalException: connection error 11-01 阅读数 3995
ConnectionFactory factory = new ConnectionFactory(); // 连接IP factory.setHost(&... 博文 来自: starzxf的专栏
Python 基础(一):入门必备知识 10-30 阅读数 12万+
Python 入门必备知识,你都掌握了吗? 博文 来自: 程序之间
刷了几千道算法题,这些我私藏的刷题网站都在这里了! 11-08 阅读数 10万+
遥想当年,机缘巧合入了 ACM 的坑,周边巨擘林立,从此过上了"天天被虐似死狗"的生活…然而我是谁,我可是死狗中的战斗鸡,智力不够那刷题来凑,开始了夜以继日哼哧哼哧刷题的日子,从此"读题与提交齐飞, ... 博文 来自: Rocky0429
在线就能用的 SQL 练习平台我给你找好了! 11-18 阅读数 1万+
大家好,我是 Rocky0429,一个最近正在学习 SQL 的蒟蒻…在看完了某 《xxx 必知必会》以后,我觉得我膨胀了,立马某度 “xxx SQL 面试 100 题”、”SQL 岗位 xxx 个面试... 博文 来自: Rocky0429
RabbitMQ客户连接池的实现 11-03 阅读数 1万+
RabbitMQ客户连接池的实现 博文 来自: 李泽昊
英特尔不为人知的 B 面 11-05 阅读数 4万+
从 PC 时代至今,众人只知在 CPU、GPU、XPU、制程、工艺等战场中,英特尔在与同行硬件芯片制造商们的竞争中杀出重围,且在不断的成长进化中,成为全球知名的半导体公司。殊不知,在「刚硬」的背后,英... 博文 来自: CSDN资讯
[Conclusion]RabbitMQ-客户端源码之总结 04-17 阅读数 12
RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上“搜刮”点散碎的知识点... 博文 来自: weixin_34250709的博客
吃人的那些 Java 名词:对象、引用、堆、栈 09-05 阅读数 3万+
作为一个有着 8 年 Java 编程经验的 IT 老兵,说起来很惭愧,我被 Java 当中的四五个名词一直困扰着:**对象、引用、堆、栈、堆栈**(栈可同堆栈,因此是四个名词,也是五个名词)。每次我看... 博文 来自: 沉默王二
60 个让程序员崩溃的瞬间,哈哈哈哈哈哈哈哈哈 12-18 阅读数 12万+
阅读本文大概需要 2.3333 分钟。前方高能,每一个程序员看完,你不笑死个人,你来找我,我自己看了好几遍,反正笑的停不下来,太特么有才了。1. 公司实习生找 Bug2....... 博文 来自: stormzhang的专栏
Java 8:一文掌握 Lambda 表达式 10-23 阅读数 1万+
本文将介绍 Java 8 新增的 Lambda 表达式,包括 Lambda 表达式的常见用法以及方法引用的用法,并对 Lambda 表达式的原理进行分析,最后对 Lambda 表达式的优缺点进行一个总... 博文 来自: Android 大强哥的博客
如何正确应对面试最后一问:你有什么问题想问我吗? 10-21 阅读数 7299
点击蓝色“程序猿DD”关注我回复“资源”获取独家整理的学习资料!作者 | 干货小分队来源 |blog.didispace.com尽管,我们之前分享了这么多关于面试的主题...... 博文 来自: 程序猿DD
2020年秋招后端面经 10-24 阅读数 1万+
主要包含华为,网易互娱,广联达,科大讯飞,浦发,中兴,上海农商行这些已经拿offer的,还有京东(不小心把二面时间换了一下,等通知等三个月了),虾皮(一面挂),顺丰(sp专场一面之后没消息,秋招第一个... 博文 来自: xianlvfan2224的博客
程序员必须掌握的核心算法有哪些? 12-26 阅读数 31万+
由于我之前一直强调数据结构以及算法学习的重要性,所以就有一些读者经常问我,数据结构与算法应该要学习到哪个程度呢?,说实话,这个问题我不知道要怎么回答你,主要取决于你想学习到哪些程度,不过针对这个问题,... 博文 来自: 帅地
RabbitMQ 中 Connection 和 Channel 详解 01-27 阅读数 147
我们知道无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是 Connection。一旦 TCP 连接建立起来,客户端紧接着可以创建一个 ... 博文 来自: weixin_33768481的博客
【算法技巧】位运算装逼指南 05-16 阅读数 5724
位算法的效率有多快我就不说,不信你可以去用 10 亿个数据模拟一下,今天给大家讲一讲位运算的一些经典例子。不过,最重要的不是看懂了这些例子就好,而是要在以后多去运用位运算这些技巧,当然,采用位运算,也... 博文 来自: weixin_34014555的博客
rabbitmq连接出错,记录一下 10-25 阅读数 1579
Exceptioninthread"main"java.io.IOException atcom.rabbitmq.client.impl.AMQChannel.wrap(AM... 博文 来自: 杨鹏飞的博客
python json java mysql pycharm android linux json格式 c#导入fbx c#中屏蔽键盘某个键 c#正态概率密度 c#和数据库登陆界面设计 c# 高斯消去法 c# codedom c#读取cad文件文本 c# 控制全局鼠标移动 c# temp 目录 bytes初始化 c#
没有更多推荐了,返回首页
©️2019 CSDN 皮肤主题: 像素格子 设计师: CSDN官方博客
jannals
TA的个人主页 >
勋章:
GitHub
绑定GitHub第三方账户获取
专栏达人
授予成功创建个人博客专栏的用户。专栏中添加五篇以上博文即可点亮!撰写博客专栏浓缩技术精华,专栏达人就是你!
持之以恒
授予每个自然月内发布4篇或4篇以上原创或翻译IT博文的用户。不积跬步无以至千里,不积小流无以成江海,程序人生的精彩需要坚持不懈地积累!
勤写标兵Lv4
授予每个自然周发布9篇以上(包括9篇)原创IT博文的用户。本勋章将于次周周三上午根据用户上周的博文发布情况由系统自动颁发。
关注
私信
展开
手机看
打赏
5C币 10C币 20C币 50C币 100C币 200C币
确定
×
扫一扫,手机浏览