因為檔案的大小一般都滿大的,處理起來一定會有延遲,如果handler 直接開始處理的話使用這一定會感受到網頁沒有回應,最後請教了強者我同學 NNNN 大大,經大大指點,才知道Google App Engine其實有提供taskqueue 來達成我要的功能。
簡單來說 task queue可以讓我們在背景執行耗時的工作,不影響到其他的服務,這篇是個簡單的範例:情境如下,使用者上傳要處理的檔案,處理過後會把處理的結果送到 ndb 的資料庫中儲存,因此使用者的 ndb model 中,加上一個 isReady 的boolean,用來記錄資料是否已經處理好,taskqueue 處理完之後就會把這一個boolean設為 True;如果他還是 false,前端就會顯示處理中的畫面(例如一個圓圈一直轉)。
Gae 的taskqueue 分為 push 跟 pull 兩種
- Push:你push 工作到queue 之後,你設定多久釋放一個工作,時間到了那個工作就會開始執行,push 的工作限定要在 10 分鐘內結束。
- Pull:這讓你決定何時從queue 中釋放工作出來執行,但你也要負擔更多管理queue 的工作。
from google.appengine.api import taskqueue task = taskqueue.add( url='/parse', target='worker', params={'user_id':user_id})https://cloud.google.com/appengine/docs/python/config/queueref
Target 指要執行 task 的module 是誰,另外可以指定instance, version,這裡我是依著範例叫做 worker,如果未指定module 的話,就會是預設的app.yaml 這個module:
文件是這麼寫:
A string naming a module/version, a frontend version, or a backend, on which to execute all of the tasks enqueued onto this queue. … If target is unspecified, then tasks are invoked on the same version of the application where they were enqueued.
url 用來在服務中,選擇對應的handler。params 則可帶入字典,指明要有哪些參數,亦可直接用 ?key=value 附加在/url 後,不過我喜歡用 params的帶入,比較清楚。
接著我們加入 worker.yaml,跟 app.yaml 一樣,這個module 用來分配task 的執行,注意handler 的URL 要用login: admin設為secure:
runtime: python27 api_version: 1 threadsafe: true module: worker handlers: - url: /.* script: worker.app login: admin
最後就可以寫真正的handler 了,我們寫在worker.py中,因為我們要操作使用者的資料,因此先把使用者資料庫的model UserMessage獨立到單獨檔案dbmodel.py 中:
Class UserMessage(ndb.Model): user = ndb.StringProperty() isReady = ndb.BooleanProperty()
Worker.py 一樣用 wsgi ,將對 /parse的要求交給 ParseHandler 處理,ParseHandler 可以用 self.request.get(‘key’) 拿到由caller 傳來的資料,我們這裡沒做什麼事,就是取出使用者資料然後把isReady 改為 True 再存回去;為了模擬耗時工作我加了個 sleep(10)
from google.appengine.ext import ndb import webapp2 import logging from dbmodel import UserMessage class ParseHandler(webapp2.RequestHandler): def post(self): time.sleep(10) user_id = str(self.request.get('user_id')) logging.info("user_id: {}".format(user_id)) query = UserMessage.query(UserMessage.user == user_id) userdata = query.fetch() if len(userdata) != 0: userdata = userdata[0] userdata.isReady = True userdata.put() app = webapp2.WSGIApplication([ ('/parse', ParseHandler) ], debug=True)
執行是最關鍵的一步了,平常是要測試的話只要
dev_appserver.py .
就好,因為我們多了一個 worker.yaml ,所以要指定它把 worker.yaml 也考慮進來,又因為兩個yaml 在application ID上會衝突,會出現 “More than one application ID found: dev~None, dev~application_id”,所以要明確指定ID:
dev_appserver.py -A application_id app.yaml worker.yaml
這樣才能正常的執行,這花了我超多時間,最後是看了google 範例code 裡,standard/taskqueue/counter 的readme 才知道要這樣執行…https://github.com/GoogleCloudPlatform/python-docs-samples
實際上來說,我們也可以在add taskqueue 的時候不指定target,然後在app.yaml 的handler 把/parse 交由 worker.app 處理,這樣就不需要分兩個yaml 了。
驗證有沒有動的話,我是有個 /view 的頁面,會去監看那個資料庫裡isReady 的狀態,並顯示這個狀態,在發動taskqueue 之後,重新整理一下網頁,就會看到狀態更新了。
另外taskqueue 也可以透過 queue.yaml 來設定queue 的名稱,update rate ,每個task 可以使用的空間上限等屬性,不過我們還是個小服務所以沒用上這個設定,這裡就不細講了。
https://cloud.google.com/appengine/docs/python/config/queueref
以上大概就是taskqueue 的小小整理,之後就是把 sleep(10) 換成真正重要的工作了。
本文感謝強者我同學 NNNN 大大的指導。
沒有留言:
張貼留言