说明:
涉及技术
- Kubernetes Stream:接收数据执行,提供实时返回数据流
- Django Channels:维持长连接,接收前端数据转给Kubernetes,同时将Kubernetes返回的数据发送给前端
- xterm.js:一个前端终端组件,用于模拟Terminal的界面显示
基本的数据流向是:用户 --> xterm.js --> django channels --> kubernetes stream,接下来看看具体的代码实现。
路由文件
先定义一个Websocket的url。
- my_project/routing.py
from django.urls import path
from django.conf.urls import url, include, re_path
from channels.routing import ProtocolTypeRouter, URLRouter
from tasks.consumer import MessagesConsumer, ImagesConsumer, WebSSHConsumer
# 用于Websocket认证, 集成了CookieMiddleware AuthMiddleware SessionMiddleware
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator
# self.scope['type'] 获取协议类型
# self.scope['url_route'] ['kwargs']['username']获取url中关键字参数
# channels routing是scope级别的, 一个连接只能由一个consumer接收和处理
# chaneels routing.py处理应用中WebSockets的路由,相当于Django中的urls.py
application = ProtocolTypeRouter({
# 'http':views, 普通的http请求不需要我们手动在这里添加, 框架会自动加载。
'websocket':AllowedHostsOriginValidator(AuthMiddlewareStack(
URLRouter([
re_path(r'^wts/(?P<podid>\d+)$',WebSSHConsumer), # 8.10
])))
})
上面的是正则匹配所有以wts/数字开头的websocket连接,都交由名为SSHConsumer的Consumer处理。
Consumer文件
Consumer的代码如下:
- my_project/apps/app/consumer.py
class KubeApi:
def __init__(self, namespace='default'):
config.load_kube_config()
self.namespace = namespace
def pod_exec(self, podname,container=""):
api_instance = client.CoreV1Api()
exec_command = [
"/bin/sh",
"-c",
'TERM=xterm-256color; export TERM; [ -x /bin/bash ] '
'&& ([ -x /usr/bin/script ] '
'&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash) '
'|| exec /bin/sh']
cont_stream = stream(api_instance.connect_get_namespaced_pod_exec,
name=podname,
namespace=self.namespace,
container=container,
command=exec_command,
stderr=True, stdin=True,
stdout=True, tty=True,
_preload_content=False
)
return cont_stream
class K8SStreamThread(Thread):
def __init__(self, websocket, container_stream):
Thread.__init__(self)
self.websocket = websocket
self.stream = container_stream
def run(self):
while self.stream.is_open():
if self.stream.peek_stdout():
stdout = self.stream.read_stdout()
self.websocket.send(stdout)
if self.stream.peek_stderr():
stderr = self.stream.read_stderr()
self.websocket.send(stderr)
else:
self.websocket.close()
class WebSSHConsumer(WebsocketConsumer):
'''
处理WebSSH的websocket请求
'''
def connect(self):
request_session = self.scope['session']
if not request_session.get('userId'):
# 未登录用户拒绝连接
self.close()
self.podid = self.scope["url_route"]["kwargs"]["podid"]
if not self.podid:
self.close()
if not Pod.objects.filter(id=self.podid).exists():
self.close()
pod = Pod.objects.get(id=self.podid)
# 不是running状态的拒绝连接
if pod.status != 'Running':
self.close()
try:
self.stream = KubeApi().pod_exec(pod.name)
kub_stream = K8SStreamThread(self, self.stream)
kub_stream.start()
self.accept()
except Exception as e:
print(e)
self.close()
def disconnect(self,close_code):
self.stream.write_stdin('exit\r')
def receive(self, text_data=None, bytes_data=None):
self.stream.write_stdin(text_data)
主要思路
Kubernetes本身提供了stream方法来实现exec的功能,返回的就是一个WebSocket可以使用的数据流,使用起来也非常方便。KubeApi
类就是这个作用。WebSSH可以看作是一个最简单的websocket长连接,每个连接建立后都是独立的,不会跟其他连接共享数据,所以这里不需要用到Group
。
当连接建立时通过self.scope["url_route"]["kwargs"]["podid"]
获取到url中的podid,在数据库查询后得到podname传给KubeApi
类,同时会新起一个线程不断循环是否有新数据产生,如果有则发送给websocket。
当websocket接收到数据就直接写入Kubernetes API,当websocket关闭则会发送个exit
命令给Kubernetes。
这样后台就成功实现了WebSSH这个功能!回顾下,代码量很少,重点是要理解其中的含义。