diff --git a/pywidevine/serve.py b/pywidevine/serve.py index 783506e..9becf70 100644 --- a/pywidevine/serve.py +++ b/pywidevine/serve.py @@ -2,7 +2,6 @@ import base64 import sys from pathlib import Path from typing import Optional, Union -from uuid import uuid4, UUID try: from aiohttp import web @@ -22,7 +21,7 @@ routes = web.RouteTableDef() async def _startup(app: web.Application): - app["sessions"]: dict[UUID, Cdm] = {} + app["sessions"]: dict[bytes, Cdm] = {} app["config"]["devices"] = { path.stem: path for x in app["config"]["devices"] @@ -45,49 +44,75 @@ async def ping(_) -> web.Response: }) +@routes.get("/open/{device}") +async def open(request: web.Request) -> web.Response: + user = request.app["config"]["users"][request.headers["X-Secret-Key"]] + device = request.match_info["device"] + + if device not in user["devices"] or device not in request.app["config"]["devices"]: + # we don't want to be verbose with the error as to not reveal device names + # by trial and error to users that are not authorized to use them + return web.json_response({ + "status": 403, + "message": f"Device '{device}' is not found or you are not authorized to use it." + }, status=403) + + device = Device.load(request.app["config"]["devices"][device]) + + cdm = Cdm(device) + session_id = cdm.open() + request.app["sessions"][session_id] = cdm + + return web.json_response({ + "status": 200, + "message": "Success", + "data": { + "session_id": session_id.hex(), + "device": { + "system_id": device.system_id, + "security_level": device.security_level + } + } + }) + + @routes.post("/challenge/{license_type}") async def challenge(request: web.Request) -> web.Response: - user = request.app["config"]["users"][request.headers["X-Secret-Key"]] - session_id = uuid4() - body = await request.json() - for required_field in ("device_name", "init_data"): + for required_field in ("session_id", "init_data"): if not body.get(required_field): return web.json_response({ "status": 400, "message": f"Missing required field '{required_field}' in JSON body." }, status=400) - # load device - device_name = body["device_name"] - if device_name not in user["devices"] or device_name not in request.app["config"]["devices"]: - # we don't want to be verbose with the error as to not reveal device names - # by trial and error to users that are not authorized to use them + # get session id + session_id = bytes.fromhex(body["session_id"]) + + # get cdm + if session_id not in request.app["sessions"]: + # e.g., app["sessions"] being cleared on server crash, reboot, and such + # or, the license message was from a challenge that was not made by our Cdm return web.json_response({ - "status": 403, - "message": f"Device '{device_name}' is not found or you are not authorized to use it." - }, status=403) - device = Device.load(request.app["config"]["devices"][device_name]) + "status": 400, + "message": "Invalid Session ID. Session ID may have Expired." + }, status=400) + cdm = request.app["sessions"][session_id] - # load init data - init_data = body["init_data"] - - # load service certificate + # set service certificate service_certificate = body.get("service_certificate") if request.app["config"]["force_privacy_mode"] and not service_certificate: return web.json_response({ "status": 403, "message": "No Service Certificate provided but Privacy Mode is Enforced." }, status=403) - - # load cdm - cdm = Cdm(device, init_data) if service_certificate: - cdm.set_service_certificate(service_certificate) - request.app["sessions"][session_id] = cdm + cdm.set_service_certificate(session_id, service_certificate) # get challenge license_request = cdm.get_license_challenge( + session_id=session_id, + init_data=body["init_data"], type_=LicenseType.Value(request.match_info["license_type"]), privacy_mode=True ) @@ -96,7 +121,6 @@ async def challenge(request: web.Request) -> web.Response: "status": 200, "message": "Success", "data": { - "session_id": session_id.hex, "challenge_b64": base64.b64encode(license_request).decode() } }, status=200) @@ -112,6 +136,9 @@ async def keys(request: web.Request) -> web.Response: "message": f"Missing required field '{required_field}' in JSON body." }, status=400) + # get session id + session_id = bytes.fromhex(body["session_id"]) + # get key type key_type = request.match_info["key_type"] try: @@ -125,8 +152,7 @@ async def keys(request: web.Request) -> web.Response: "message": f"The Key Type value is invalid, {e}" }, status=400) - # load cdm session - session_id = UUID(hex=body["session_id"]) + # get cdm if session_id not in request.app["sessions"]: # e.g., app["sessions"] being cleared on server crash, reboot, and such # or, the license message was from a challenge that was not made by our Cdm @@ -137,6 +163,9 @@ async def keys(request: web.Request) -> web.Response: cdm = request.app["sessions"][session_id] # parse the license message + cdm.parse_license(session_id, body["license_message"]) + + # prepare the keys license_keys = [ { "key_id": key.kid.hex, @@ -144,7 +173,7 @@ async def keys(request: web.Request) -> web.Response: "type": key.type, "permissions": key.permissions, } - for key in cdm.parse_license(body["license_message"]) + for key in cdm._sessions[session_id].keys if key.type == key_type ]