一、背景
在最近的項(xiàng)目中的一個(gè)需求是消息實(shí)時(shí)推送消息以及通知功能,項(xiàng)目使用django寫(xiě)的所以決定采用django-channels來(lái)實(shí)現(xiàn)websocket進(jìn)行實(shí)時(shí)通訊。目前官方已經(jīng)更新到2.1版本,相對(duì)于老的channels 1.x版本有了很大變化,無(wú)論是使用方式還是功能,其中最大的變化莫過(guò)于2.x版本中帶來(lái)的asyncio特性,可使用異步處理模式。本文內(nèi)容將介紹channels2版本使用,由于項(xiàng)目django是1.11,其中也遇到了一些坑,比如在channels在處理一次請(qǐng)求后hang住然后報(bào)錯(cuò),后面修改了下django1.11版本的一點(diǎn)源碼得以解決,2.0版本應(yīng)該不會(huì)有問(wèn)題。
二、channels介紹
channels是以django插件的形式存在,它不僅能處理http請(qǐng)求,還提供對(duì)websocket、MQTT等長(zhǎng)連接支持。不僅如此,channels在保留了原生django的同步和易用的特性上還帶來(lái)了異步處理方式(channels2.X版本),并且將django自帶的認(rèn)證系統(tǒng)以及session集成到模塊中,擴(kuò)展性非常強(qiáng)。官方文檔:https://channels.readthedocs.io/en/latest/index.html
三、安裝以及安裝需求
channels2.0最低django版本要求是1.11+,python3.5+。筆者的版本是django1.11,直接安裝可能有問(wèn)題,以下是測(cè)試通過(guò)的版本。
筆者的相關(guān)版本如下:
Django==1.11.10 channels==2.1.4 channels-redis==2.3.1 asgiref==2.1.6 asgi-redis==1.4.3
如果django版本比較高直接采用pip安裝:
pip3 install channels pip3 install channels-redis #可選的,官方推薦如果使用redis作為channel layer
redis安裝可以參考博客:https://www.gxlcms.com/article/151522.htm
四、開(kāi)始使用
一、配置settings.py
筆者采用的redis作為channel layer(關(guān)于其介紹請(qǐng)移步至https://channels.readthedocs.io/en/latest/topics/channel_layers.html),它是實(shí)現(xiàn)消息推送的核心,在項(xiàng)目的settings.py中:
注冊(cè)channles app:
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'cmdb', 'channels', #注冊(cè)app ]
配置channels layer:
ASGI_APPLICATION = 'devops.routing.application' CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { "hosts": [('10.1.210.33', 6379)], #需修改 }, }, }
二、路由配置
在項(xiàng)目settings文件同級(jí)目錄中新增routing.py
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter import deploy.routing application = ProtocolTypeRouter({ 'websocket': AuthMiddlewareStack( URLRouter( deploy.routing.websocket_urlpatterns# 指明路由文件是devops/routing.py ) ), })
最后在app里配置路由和對(duì)應(yīng)的消費(fèi)者,筆者這里是devops下的routing.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from django.conf.urls import url from . import consumers websocket_urlpatterns = [ url(r'^ws/deploy/(?P<service_name>[^/]+)/$', consumers.DeployResult), #consumers.DeployResult 是該路由的消費(fèi)者 ]
項(xiàng)目目錄結(jié)構(gòu)如下:
三、編寫(xiě)webscoket消息處理方法(消費(fèi)者)
首先說(shuō)明,消費(fèi)者是Channels代碼的基本單元,當(dāng)一個(gè)新的Socket進(jìn)入的時(shí)候,Channels會(huì)根據(jù)路由表找到正確的消費(fèi)者,以下代碼中每個(gè)方法都可以看作一個(gè)消費(fèi)者,他們消費(fèi)不同的event,比如剛剛接受連接時(shí)候connect方法進(jìn)行消費(fèi)處理并接受連接,關(guān)閉websocket時(shí)候使用disconnect進(jìn)行消費(fèi)處理。
deploy/consumers.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from channels.generic.websocket import AsyncWebsocketConsumer import json class DeployResult(AsyncWebsocketConsumer): async def connect(self): self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"] self.chat_group_name = 'chat_%s' % self.service_uid # 收到連接時(shí)候處理, await self.channel_layer.group_add( self.chat_group_name, self.channel_name ) await self.accept() async def disconnect(self, close_code): # 關(guān)閉channel時(shí)候處理 await self.channel_layer.group_discard( self.chat_group_name, self.channel_name ) # 收到消息 async def receive(self, text_data): text_data_json = json.loads(text_data) message = text_data_json['message'] print("收到消息--》",message) # 發(fā)送消息到組 await self.channel_layer.group_send( self.chat_group_name, { 'type': 'client.message', 'message': message } ) # 處理客戶(hù)端發(fā)來(lái)的消息 async def client_message(self, event): message = event['message'] print("發(fā)送消息。。",message) # 發(fā)送消息到 WebSocket await self.send(text_data=json.dumps({ 'message': message }))
以上代碼部分說(shuō)明:
1.self.scope是單個(gè)連接傳入的詳細(xì)信息,其中包含了請(qǐng)求的session、以及django認(rèn)證系統(tǒng)中的用戶(hù)信息等;
2.async...await 是python3.5之后的新異步特性,基于asyncio模塊;
四、發(fā)起webscoket請(qǐng)求
利用js發(fā)起websocket請(qǐng)求
function InitWebSocket() { var websocket = new WebSocket( 'ws://' + window.location.host + '/ws/deploy/tasks/' ); websocket.onmessage = function (e) { var data = JSON.parse(e.data); var message = '\n' + data['message']; document.querySelector('#deploy-res').innerText += (message + '\n'); }; }
五、發(fā)送消息到channel
無(wú)論是消息的推送或者消息的接受,都是經(jīng)過(guò)channel layer進(jìn)行傳輸,以下是發(fā)送消息示例,
from channels.layers import get_channel_layer from asgiref.sync import async_to_sync channel_layer = get_channel_layer() def send_channel_msg(channel_name, msg): """ send msg to channel :param channel_name: :param msg: :return: """ async_to_sync(channel_layer.group_send)(channel_name, {"type": "deploy.run", "text": msg})
六、生產(chǎn)部署
大多數(shù)django的應(yīng)用部署方式都采用的是nginx+uwsgi進(jìn)行部署,當(dāng)django集成channels時(shí)候,由于uwsgi不能處理websocket請(qǐng)求,所以我們需要asgi服務(wù)器來(lái)處理websocket請(qǐng)求,官方推薦使用daphne。下一篇文章將介紹nginx+supervisor+daphne+uwsgi進(jìn)行生產(chǎn)部署。
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com