| #!/usr/bin/env python3 |
| |
| import sys |
| import os |
| import re |
| import requests |
| from requests.auth import HTTPBasicAuth |
| from urllib.parse import urljoin |
| import urllib.request |
| import json |
| import time |
| from os.path import expanduser |
| import jenkins |
| |
| |
| serverRootUrl = "https://jenkins-sh.amlogic.com/job/Security/job/" |
| homeConfigFilePath = "~/.sign.cfg" |
| types = ["ta", "vmxta", "irdetota", "bl32", "bl31", "bl2", "bl2e", "bl2x", "bl40", "aucpufw", "vdecfw"] |
| casProviders = ["", "VMX", "nagra", "nagra-dev", "vo-dev", "vo", "gs-dev", "gs", "irdeto"] |
| ddrTypes = ["ddr4", "lpddr4", "ddr3", "lpddr3", "lpddr4_lpddr5"] |
| chipVariants = ["", "general", "nocs-jts-ap", "nocs-prod", "onboot"] |
| csSigSchemes = ["", "rsa", "rsa-mldsa"] |
| dvSigSchemes = ["", "rsa", "rsa-mldsa"] |
| |
| user = "" |
| password = "" |
| auth = None |
| server = None |
| |
| |
| def init(): |
| global user |
| global password |
| global auth |
| global server |
| expandedHomeConfigFilePath = expanduser(homeConfigFilePath) |
| configPyPath = os.path.join(sys.path[0], "config.py") |
| if os.path.exists(expandedHomeConfigFilePath): |
| configFilePath = expandedHomeConfigFilePath |
| elif os.path.exists(configPyPath): |
| configFilePath = configPyPath |
| else: |
| print( |
| "Can't find configuration file. Please create configuration file .sign.cfg at your home directory." |
| ) |
| exit(1) |
| |
| with open(configFilePath, "r") as configFile: |
| while True: |
| line = configFile.readline() |
| if not line: |
| break |
| words = line.split("=") |
| if words[0].strip() == "user": |
| user = words[1].strip().replace("'", "") |
| elif words[0].strip() == "password": |
| password = words[1].strip().replace("'", "") |
| auth = HTTPBasicAuth(user, password) |
| server = jenkins.Jenkins( |
| "https://jenkins-sh.amlogic.com", username=user, password=password |
| ) |
| |
| |
| def get_args(): |
| from argparse import ArgumentParser |
| |
| parser = ArgumentParser() |
| |
| parser.add_argument("--in", dest="inputFilePath", required=True, help="input file") |
| parser.add_argument( |
| "--chipAcsFile", dest="chipAcsFilePath", default="null", help="chip acs file" |
| ) |
| parser.add_argument( |
| "--out", dest="outFilePath", type=str, default="", help="output signed file" |
| ) |
| parser.add_argument( |
| "-v", "--version", action="version", version="%(prog)s 1.0", help="version" |
| ) |
| parser.add_argument("--type", choices=types, default=types[0], required=True) |
| parser.add_argument("--chip", type=str) |
| parser.add_argument("--taVersion", type=int, default=0) |
| parser.add_argument("--marketId", type=str, default="null") |
| parser.add_argument("--casProvider", choices=casProviders, default=casProviders[0]) |
| parser.add_argument("--ddrType", type=str, default=ddrTypes[0]) |
| parser.add_argument("--chipVariant", choices=chipVariants, default=chipVariants[0]) |
| parser.add_argument("--keyType", type=str, dest="keyType", default="dev-keys") |
| parser.add_argument("--extraArgs", type=str, default="") |
| parser.add_argument("--csSigScheme", choices=csSigSchemes, default=csSigSchemes[0]) |
| parser.add_argument("--dvSigScheme", choices=dvSigSchemes, default=dvSigSchemes[0]) |
| parser.add_argument("--testService", type=int, default=0) |
| |
| return parser.parse_args() |
| |
| |
| def getLastBuildNumber(rootJobUrl): |
| url = urljoin(rootJobUrl, "lastBuild/buildNumber") |
| |
| response = requests.get(url, auth=auth) |
| |
| if response.status_code == 200: |
| return response.text |
| else: |
| print( |
| "Fail to get last build number due to the error " |
| + str(response.status_code) |
| ) |
| return 0 |
| |
| |
| def getJobRootUrl(type): |
| if type == "ta": |
| return urljoin(serverRootUrl, "Signing/job/Sign_TA/") |
| elif type == "vmxta": |
| return urljoin(serverRootUrl, "CAS/job/VMX/job/VMX_TA_Sign/") |
| elif type == "irdetota": |
| return urljoin(serverRootUrl, "Signing/job/Sign_Irdeto_TA_for_s905c1/") |
| elif type == "bl31": |
| return urljoin(serverRootUrl, "Signing/job/Sign_Bl31/") |
| elif type == "bl2": |
| return urljoin(serverRootUrl, "Signing/job/Sign_Bl2/") |
| elif type == "bl32": |
| return urljoin(serverRootUrl, "Signing/job/Sign_Bl32/") |
| elif type == "aucpufw": |
| return urljoin(serverRootUrl, "Signing/job/Sign_AUCPU_FW/") |
| elif type == "vdecfw": |
| return urljoin(serverRootUrl, "Signing/job/Sign_VDEC_FW/") |
| else: # bl2e, bl2x, bl40 |
| return urljoin(serverRootUrl, "Signing/job/Sign_Bl2e_Bl2x_Bl40/") |
| |
| |
| def getJobName(type): |
| if type == "ta": |
| return "Sign_TA" |
| elif type == "vmxta": |
| return "Sign_VMX_TA" |
| elif type == "irdetota": |
| return "Sign_Irdeto_TA_for_s905c1" |
| elif type == "bl31": |
| return "Sign_Bl31" |
| elif type == "bl2": |
| return "Sign_Bl2" |
| elif type == "bl32": |
| return "Sign_Bl32" |
| elif type == "aucpufw": |
| return "Sign_AUCPU_FW" |
| elif type == "vdecfw": |
| return "Sign_VDEC_FW" |
| else: # bl2e, bl2x, bl40 |
| return "Sign_Bl2e_Bl2x_Bl40" |
| |
| |
| def submitSignJob( |
| type, |
| chipType, |
| inputFilePath, |
| chipAcsFilePath, |
| taVersion="0", |
| marketId="", |
| casProvider="", |
| chipVariant="", |
| ddrType="", |
| keyType="dev-keys", |
| extraArgs="", |
| csSigScheme="", |
| dvSigScheme="", |
| testService=0, |
| ): |
| |
| fileName = os.path.basename(inputFilePath) |
| fileParameter = "file" |
| uploadFile = { |
| fileParameter: (fileName, open(inputFilePath, "rb")), |
| } |
| url = getJobRootUrl(type) + "buildWithParameters" |
| if type == "ta" or type == "vmxta" or type == "irdetota": |
| data = { |
| "chip_part_number": chipType, |
| "ta_version": taVersion, |
| "market_id": marketId, |
| "csSigScheme": csSigScheme, |
| "dvSigScheme": dvSigScheme, |
| "testService": testService, |
| } |
| elif type == "bl32": |
| |
| data = { |
| "chipPartNumber": chipType, |
| "chipVariant": chipVariant, |
| "casProvider": casProvider, |
| "keyType": keyType, |
| "csSigScheme": csSigScheme, |
| "dvSigScheme": dvSigScheme, |
| "testService": testService, |
| } |
| |
| elif type == "bl2": |
| chipAcsfileName = os.path.basename(chipAcsFilePath) |
| uploadFile = { |
| fileParameter: (fileName, open(inputFilePath, "rb")), |
| "chipAcsFile": (chipAcsfileName, open(chipAcsFilePath, "rb")), |
| } |
| data = { |
| "chipPartNumber": chipType, |
| "chipVariant": chipVariant, |
| "ddrType": ddrType, |
| "keyType": keyType, |
| "csSigScheme": csSigScheme, |
| "dvSigScheme": dvSigScheme, |
| "testService": testService, |
| } |
| else: # bl2e, bl2x, bl31, bl40, aucpufw, vdecfw |
| data = { |
| "chipPartNumber": chipType, |
| "chipVariant": chipVariant, |
| "keyType": keyType, |
| "extraArgs": extraArgs, |
| "csSigScheme": csSigScheme, |
| "dvSigScheme": dvSigScheme, |
| "testService": testService, |
| } |
| |
| response = requests.post(url, auth=auth, data=data, files=uploadFile) |
| |
| if response.status_code == 201: |
| print("Sumbit signing job successfully, please wait...") |
| |
| else: |
| print( |
| "Fail to start signing job due to the error: " + str(response.status_code) |
| ) |
| exit(1) |
| |
| |
| def queryBuildStatus(rootJobUrl, buildNumber): |
| url = rootJobUrl + str(buildNumber) + "/api/json?tree=building" |
| |
| response = requests.get(url, auth=auth) |
| |
| if response.status_code == 200: |
| result = json.loads(response.text) |
| return str(result["building"]) |
| else: |
| return "NotStart" |
| |
| def queryBuildUser(rootJobUrl, buildNumber): |
| url = rootJobUrl + str(buildNumber) + "/api/json?tree=actions[causes[userId]]" |
| |
| response = requests.get(url, auth=auth) |
| |
| if response.status_code == 200: |
| result = json.loads(response.text) |
| return str(result["actions"][1]["causes"][0]["userId"]) |
| else: |
| return "Error" |
| |
| def checkBuildParameter(rootJobUrl, buildNumber, argsChip, argsCasProvider, argsKeyType): |
| url = rootJobUrl + str(buildNumber) + "/api/json" |
| job = rootJobUrl + str(buildNumber) |
| |
| response = requests.get(url, auth=auth) |
| data = response.json() |
| |
| # parse JSON data to get chipPartNumber, casProvider, keyType |
| actions = data.get("actions", []) |
| for action in actions: |
| parameters = action.get("parameters", []) |
| for parameter in parameters: |
| if parameter.get("_class") == "hudson.model.StringParameterValue": |
| if parameter.get("name") == "chipPartNumber": |
| chipPartNumber = parameter.get("value") |
| if chipPartNumber != argsChip: |
| print("The jenkins Build job %s chip value doesn't match with yours." % (job)) |
| return False |
| |
| if parameter.get("name") == "casProvider": |
| casProvider = parameter.get("value") |
| if casProvider != argsCasProvider: |
| print("The jenkins Build job %s casProvider value doesn't match with yours." % (job)) |
| return False |
| |
| if parameter.get("name") == "keyType": |
| keyType = parameter.get("value") |
| if keyType != argsKeyType: |
| print("The jenkins Build job %s keyType value doesn't match with yours." % (job)) |
| return False |
| |
| return True |
| |
| def downloadSignedFile(rootJobUrl, buildNumber, inFileDir="", specifiedOutFilePath=""): |
| |
| url = rootJobUrl + str(buildNumber) + "/api/json?tree=artifacts[relativePath]" |
| |
| response = requests.get(url, auth=auth) |
| |
| if response.status_code == 200: |
| result = json.loads(response.text) |
| if len(result["artifacts"]) == 0: |
| print("Fail to build, please check jenkins log for detailed error") |
| exit(1) |
| relativePath = result["artifacts"][0]["relativePath"] |
| # http://127.0.0.1:8080/job/Sign_Bl31/46/artifact/46/output/bl31-payload.bin.signed |
| downloadUrl = rootJobUrl + str(buildNumber) + "/artifact/" + "/" + relativePath |
| if specifiedOutFilePath == "": |
| outFilePath = os.path.join(inFileDir, os.path.basename(relativePath)) |
| else: |
| outFilePath = specifiedOutFilePath |
| r = requests.get(downloadUrl, auth=auth) |
| with open(outFilePath, "wb") as f: |
| f.write(r.content) |
| print("Download the signed file at " + outFilePath) |
| return 0 |
| else: |
| print("Fail to download the signed file") |
| exit(1) |
| return 1 |
| |
| |
| def waitForSubmit(type): |
| jobName = getJobName(type) |
| |
| while True: |
| queues = server.get_queue_info() |
| inQueue = False |
| if queues: |
| for queue_job_info in queues: |
| if queue_job_info["task"].get("name", "") == jobName: |
| inQueue = True |
| break |
| if inQueue: |
| time.sleep(1) |
| print( |
| "Otherone is signing same firmware as you request. Please wait them to complete." |
| ) |
| else: |
| print("It is your turn to submit your signing job now.") |
| break |
| |
| |
| def main(): |
| print(sys.argv) |
| init() |
| args = get_args() |
| |
| rootJobUrl = getJobRootUrl(args.type) |
| |
| waitForSubmit(args.type) |
| lastBuildNumber = getLastBuildNumber(rootJobUrl) |
| submitSignJob( |
| type=args.type, |
| chipType=args.chip, |
| inputFilePath=args.inputFilePath, |
| chipAcsFilePath=args.chipAcsFilePath, |
| taVersion=args.taVersion, |
| marketId=args.marketId, |
| casProvider=args.casProvider, |
| chipVariant=args.chipVariant, |
| ddrType=args.ddrType, |
| keyType=args.keyType, |
| extraArgs=args.extraArgs, |
| csSigScheme=args.csSigScheme, |
| dvSigScheme=args.dvSigScheme, |
| testService=args.testService, |
| ) |
| |
| buildNumber = int(lastBuildNumber) + 1 |
| print("The jenkins build number: " + str(buildNumber)) |
| while True: |
| time.sleep(1) |
| building = queryBuildStatus(rootJobUrl, buildNumber) |
| print("Building Status= " + str(building)) |
| if building == "False": |
| userId = queryBuildUser(rootJobUrl, buildNumber) |
| if userId.lower() != user.lower(): |
| print("The jenkins Build number user name doesn't match yours.It may be caused by too many signing requests submit to the same job.") |
| print("Please wait a moment, and try again.") |
| exit(1) |
| |
| status = checkBuildParameter(rootJobUrl, buildNumber, args.chip, args.casProvider, args.keyType) |
| if status == False: |
| print("It may be caused by too many signing requests submit to the same job.") |
| print("Please wait a moment, and try again.") |
| exit(1) |
| |
| print("Build is done. Will start to download the signed file") |
| break |
| |
| inputFileDir = os.path.dirname(args.inputFilePath) |
| downloadSignedFile(rootJobUrl, buildNumber, inputFileDir, args.outFilePath) |
| |
| |
| if __name__ == "__main__": |
| main() |