有時候後端要執行一個時間比較長的任務,而任務內容極為複雜,又容易出錯,因此希望讓使用者看到即時的 console log,讓我們函式中的 print 輸出能即時傳到使用者的瀏覽器。
以下將會以 Django, Thread, Queue 進行實做
StreamingHttpResponse
一般的網頁請求都是一次打包好所有資料,全部傳給使用者,有些情況我們不能等到所有資料準備好才一次傳,而要拿到一些就傳一些,這個時候我們就要使用串流輸出,在 Django 裡,就是使用 StreamingHttpResponse,以下簡稱 SHR。SHR 接收一個 Iterator 作為輸入,因此我們只要實做一個迭代器函式,其中每次 yield 就會由 SHR 傳送到瀏覽器
# Example of StreamingHttpResponse
from django.http.response import StreamingHttpResponse
def example():
for i in range(5):
# Add <br> to break line in browser
yield f'{i}<br>'
def stream(request):
return StreamingHttpResponse(example())
Output (in browser):
0
1
2
3
4
Thread
由於我們的程式需要一邊執行目標任務,一邊串流輸出,因此需要平行化執行。Python 中可以使用 threading, multiprocessing 等方式做平行化執行,本文將使用 threading。
# Example of threading
from threading import Thread
import time
def example(times):
for i in range(times):
print(i)
time.sleep(1)
# 建立 Thread
thread = Thread(target=example, args=(5,))
# 啟動 Thread
thread.start()
time.sleep(2)
print("This is printed in main thread")
# 等待 thread 完成
thread.join()
Output:
0
1
This is printed in main thread
2
3
4
重新導向 Stdout
Python 中 print 的輸出稱之為 stdout – Standard Output 標準輸出,簡單來說就是輸出到終端機的內容。要改變 print 的輸出位置,我們就要改變 sys.stdout 這個變數,stdout 可以接受任何 File-like 的物件,具體而言就是實做了 write
方法的物件。
# Example of redirect stdout
import sys
class Printer:
def __init__(self):
self.contents = []
def write(self, value):
self.contents.append(value)
printer = Printer()
sys.stdout = printer
print('This should be saved in printer')
sys.stdout = sys.__stdout__
print('This should be printed to stdout')
print(printer.contents)
Output:
This should be printed to stdout
['This should be saved in printer', '\n']
實做串流輸出 Stdout
環境
Python 3.8.5
Django 3.2
首先建立一個 Django Project
pip install django
django-admin startproject console_streaming
cd console_streaming
python manage.py startapp web
安裝 web:
# console_streaming/settings.py
INSTALLED_APPS = [
...
# 加入 web
'web',
]
建立 view:
# web/views.py
def stream(request):
# 待會實做
pass
綁定到 urls:
# console_streaming/urls.py
from django.urls import path
from web import views
urlpatterns = [
path('stream/', views.stream),
]
測試函式
這是我們用來模擬目標任務函式,重複 times 次的 print 出一行字,並等待一秒,最後 print 出 “Done”。
# web/views.py
import time
def job(times):
for i in range(times):
print(f'Task #{i}')
time.sleep(1)
print('Done')
time.sleep(0.5)
Printer class
以下實做一個 Printer 來處理所有的 stdout,並且整個程式生命週期只用一個 instance,原因是 sys.stdout 是不分 thread 的,因此如果不同的 request 使用不同的 printer,會搶走別人的 stdout。因此我使用一個 map 來儲存不同 thread 的 queue,並用 current_thread() 來判斷當前的 thread 取出正確的 queue。而假如當前的 thread 沒有註冊到 Printer,則使用 stdout 輸出。
# web/views.py
from queue import Queue
from threading import current_thread
import sys
class Printer:
def __init__(self):
self.queues = {}
def write(self, value):
'''handle stdout'''
queue = self.queues.get(current_thread().name)
if queue:
queue.put(value)
else:
sys.__stdout__.write(value)
def flush(self):
'''Django would crash without this'''
pass
def register(self, thread):
'''註冊一個 Thread'''
queue = Queue()
self.queues[thread.name] = queue
return queue
def clean(self, thread):
'''刪除一個 Thread'''
del self.queues[thread.name]
# 初始化一個 Printer instance
printer = Printer()
sys.stdout = printer
Streamer class
接下來要實做併發執行以及回傳 StreamingHttpResponse,我將併發執行的部份寫成一個 Streamer class,其會初始化一個 Thread,將其註冊到 printer 取得 queue,接著不斷讀取 queue 的內容,將內容 yield 到 StreamingHttpResponse,直到 thread 結束。
from threading import Thread
class Steamer:
def __init__(self, target, args):
self.thread = Thread(target=target, args=args)
self.queue = printer.register(self.thread)
def start(self):
self.thread.start()
print('This should be stdout')
while self.thread.is_alive():
try:
item = self.queue.get_nowait()
yield f'{item}<br>'
except Empty:
pass
yield 'End'
printer.clean(self.thread)
def stream(request):
streamer = Steamer(job, (10,))
return StreamingHttpResponse(streamer.start())
執行 Django
$ python manage.py runserver
打開 http://localhost:8000/stream/
就能看到像是以下的輸出
而每新增一個請求,終端機就可以看到一行
This should be stdout
完整 views.py
from django.http.response import StreamingHttpResponse
from queue import Queue, Empty
from threading import Thread, current_thread
import time
import sys
class Printer:
def __init__(self):
self.queues = {}
def write(self, value):
'''handle stdout'''
queue = self.queues.get(current_thread().name)
if queue:
queue.put(value)
else:
sys.__stdout__.write(value)
def flush(self):
'''Django would crash without this'''
pass
def register(self, thread):
'''註冊一個 Thread'''
queue = Queue()
self.queues[thread.name] = queue
return queue
def clean(self, thread):
'''刪除一個 Thread'''
del self.queues[thread.name]
printer = Printer()
sys.stdout = printer
class Steamer:
def __init__(self, target, args):
self.thread = Thread(target=target, args=args)
self.queue = printer.register(self.thread)
def start(self):
self.thread.start()
print('This should be stdout')
while self.thread.is_alive():
try:
item = self.queue.get_nowait()
yield f'{item}<br>'
except Empty:
pass
yield 'End'
printer.clean(self.thread)
def job(times):
for i in range(times):
print(f'Task #{i}')
time.sleep(1)
print('Done')
time.sleep(0.5)
def stream(request):
streamer = Steamer(job, (10,))
return StreamingHttpResponse(streamer.start())
完整程式碼:GitHub