Alarm robots often have the following warnings:
<27>1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Socket Error: 104
<31>1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Removing timeout for next heartbeat interval
<28>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Socket closed when connection was open
<31>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Added: {'callback': <bound method SelectConnection._on_connection_start of <pika.adapters.select_connection.SelectConnection object at 0x7f74752525d0>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 1}
<28>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Disconnected from RabbitMQ at xx_host:5672 (0): Not specified
<31>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Processing 0:_on_connection_closed
<31>1 2018-xx-xxT06:59:03.040Z 660ece0ebaad admin/admin 14 - - Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f74752513f8>> for "0:_on_connection_closed"
Determine the location of the error
If you have a log, it is easy to handle. First of all, look at where the log was written. Start with three places.
Our own code
No.
uwsgi code
root@660ece0ebaad:/# uwsgi --version 2.0.14
Coming down from github, no.
python library code
Execute in container
>>> import sys
>>> sys.path
['', '/usr/lib/python2.7', '/usr/lib/python2.7/plat-x86_64-linux-gnu', '/usr/lib/python2.7/lib-tk', '/usr/lib/python2.7/lib-old', '/usr/lib/python2.7/lib-dynload', '/usr/local/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages/PILcompat', '/usr/lib/python2.7/dist-packages/gtk-2.0']
Grep in these directories, found in pika
root@660ece0ebaad:/usr/local/lib/python2.7# grep "Socket Error" -R .
Binary file ./dist-packages/pika/adapters/base_connection.pyc matches
./dist-packages/pika/adapters/base_connection.py: LOGGER.error("Fatal Socket Error: %r", error_value)
./dist-packages/pika/adapters/base_connection.py: LOGGER.error("Socket Error: %s", error_code)
Determine the pika version.
>>> import pika
>>> pika.__version__
'0.10.0'
As you can see from the code, Socket Error is the error code of errno. Make sure that the meaning of the error code is that the peer sends an RST.
>>> import errno
>>> errno.errorcode[104]
'ECONNRESET'
It is suspected that the rabbitmq server address is wrong. An unlisted port will return RST, but after verification, it is found that it is not.
Then I suspected that the connection timeout and disconnection did not notify the client. Looking at the rabbitmq server log, I found a lot:
=ERROR REPORT==== 7-Dec-2018::20:43:18 ===
closing AMQP connection <0.9753.18> (172.17.0.19:27542 -> 192.168.44.112:5672):
missed heartbeats from client, timeout: 60s
--
=ERROR REPORT==== 7-Dec-2018::20:43:18 ===
closing AMQP connection <0.9768.18> (172.17.0.19:27544 -> 192.168.44.112:5672):
missed heartbeats from client, timeout: 60s
It is found that the links between rabbitmq server and admin docker have all been broken
root@xxxxxxx:/home/dingxinglong# netstat -nap | grep 5672 | grep "172.17.0.19"
So, why does rabbitmq server kick the link established by pika? See the pika code comment:
:param int heartbeat_interval: How often to send heartbeats.
Min between this value and server's proposal
will be used. Use 0 to deactivate heartbeats
and None to accept server's proposal.
We did not pass in the heartbeat interval, in theory, the server's default 60S should be used. In fact, the client has never sent a heartbeat packet. So continue to look at the code:
By printing, it is confirmed that the HeartbeatChecker object is successfully created and the timer is successfully created, but the timer has never been called back.
From the code all the way down, we use blocking_connections, which can be seen in its add_timeout comment:
def add_timeout(self, deadline, callback_method):
"""Create a single-shot timer to fire after deadline seconds. Do not
confuse with Tornado's timeout where you pass in the time you want to
have your callback called. Only pass in the seconds until it's to be
called.
NOTE: the timer callbacks are dispatched only in the scope of
specially-designated methods: see
`BlockingConnection.process_data_events` and
`BlockingChannel.start_consuming`.
:param float deadline: The number of seconds to wait to call callback
:param callable callback_method: The callback method with the signature
callback_method()
The timer is triggered by process_data_events, which we did not call. So the client's heartbeat has never been triggered. Simply turn off the heartbeat to solve this problem.
Specific trigger point
The calling code is as follows: main_loop is not run, therefore, the FIN package of rabbitmq_server
is not processed, and the link status cannot be tracked.
Follow the code of the basic_publish
interface all the way.
When sending, received RST, and finally ran to base_connection.py:452
, _handle_error function to print socket_error
.
def connect_mq():
mq_conf = xxxxx
connection = pika.BlockingConnection(
pika.ConnectionParameters(mq_conf['host'],
int(mq_conf['port']),
mq_conf['path'],
pika.PlainCredentials(mq_conf['user'],
mq_conf['pwd']),
heartbeat_interval=0))
channel = connection.channel()
channel.exchange_declare(exchange=xxxxx, type='direct', durable=True)
return channel
channel = connect_mq()
def notify_xxxxx():
global channel
def _publish(product):
channel.basic_publish(exchange=xxxxx,
routing_key='xxxxx',
body=json.dumps({'msg': 'xxxxx'}))