import json import os import requests import sys from hashlib import md5 from wget import download import ssl import hashlib def file_write(file_name,text_content): # charset() file = open(file_name , "w") file.write(str(text_content)) file.close() def dingtalk(cdn_url,packageUrl): tpl_for_dingtalk=f''' 下载地址: ({cdn_url}{packageUrl}) ![image]([TPL_STATUS_PIC]) 项目名称:[PLUGIN_TPL_REPO_SHORT_NAME] 更新作者: [CI_COMMIT_AUTHOR_NAME] 更新内容:[TPL_COMMIT_MSG] [ [查看构建详情)]]([TPL_BUILD_LINK]) ''' print (tpl_for_dingtalk) file_write("./.tpl",tpl_for_dingtalk) def obsUpload(file,uploadType): url_obsUpload = os.getenv('url_for_iotfile') + "/api/file/v1/attachment/obsUpload" myfiles = {'file': open(file, 'rb')} mydata = {'uploadType': uploadType} r = requests.post(url_obsUpload, data=mydata, files=myfiles) print (r.text) json_str = r.text info = json.loads(json_str)['data'] return info # APP管理-新增APP def platformAppAdd(package,suffix=None): # with open(".tags") as f: version = f.read().strip() logoFile = obsUpload(file=str(os.getenv('logo')), uploadType='logo') packageFile = obsUpload(file=str(os.getenv('package')),uploadType='compress') name = os.getenv('name') if suffix !=None: name = name + str(suffix) bodyjson = { 'name': name, 'version': os.getenv('version'), 'genre': os.getenv('genre'), 'logoCode': logoFile['code'], 'logoUrl': logoFile['url'], 'packageCode': packageFile['code'], 'packageUrl': packageFile['url'], 'des': os.getenv('des') } print (bodyjson) url_platformAppAdd = os.getenv('url_for_iotplatform') + "/api/platform/v1/platformApp/platformAppAdd" headers = {'content-type': 'application/json;charset=UTF-8'} r = requests.post(url_platformAppAdd, headers=headers, data=json.dumps(bodyjson)) print (r.text) cdn_url = os.getenv('cdn_url') packageUrl = packageFile['url'] dingtalk(cdn_url,packageUrl) # 应用管理-授权(获取code list) def getPlatformProgramList(): bodyjson = { 'name': os.getenv('name'), 'version': os.getenv('version'), 'genre': os.getenv('genre'), } print (bodyjson) url_platformAppAdd = os.getenv('url_for_iotplatform') + "/api/platform/v1/platformProgram/getPlatformProgramList/1/10" headers = {'content-type': 'application/json;charset=UTF-8'} r = requests.post(url_platformAppAdd, headers=headers, data=json.dumps(bodyjson)) print (type(json.loads(r.text))) print (json.loads(r.text)['data']["listObject"][0]['code']) return json.loads(r.text)['data']["listObject"][0]['code'] # 应用管理-授权 def multipleProgramToOneProject(): codeList = getPlatformProgramList() bodyjson = { "projectCode": os.getenv('projectCode'), "codeList": [codeList] } print (bodyjson) url_platformAppAdd = os.getenv('url_for_iotplatform') + "/api/platform/v1/platformProgram/multipleProgramToOneProject" headers = {'content-type': 'application/json;charset=UTF-8'} r = requests.post(url_platformAppAdd, headers=headers, data=json.dumps(bodyjson)) print (r.text) # 应用管理-新增应用 def platformProgramAdd(): # with open(".tags") as f: version = f.read().strip() logoFile = obsUpload(file=str(os.getenv('logo')), uploadType='logo') packageFile = obsUpload(file=str(os.getenv('package')),uploadType='compress') bodyjson = { 'name': os.getenv('name'), 'version': os.getenv('version'), 'genre': os.getenv('genre'), 'sourceId': os.getenv('sourceId'), "resolution": os.getenv('resolution'), 'logoCode': logoFile['code'], 'logoUrl': logoFile['url'], 'packageCode': packageFile['code'], 'packageUrl': packageFile['url'], 'des': os.getenv('des') } print (bodyjson) url_platformAppAdd = os.getenv('url_for_iotplatform') + "/api/platform/v1/platformProgram/platformProgramAdd" headers = {'content-type': 'application/json;charset=UTF-8'} r = requests.post(url_platformAppAdd, headers=headers, data=json.dumps(bodyjson)) print (r.text) cdn_url = os.getenv('cdn_url') packageUrl = packageFile['url'] dingtalk(cdn_url,packageUrl) # 自动授权 # multipleProgramToOneProject() def get_the_last_windowsAppZIP(name): bodyjson = { 'name': name } print (bodyjson) url_platformAppAdd = os.getenv('url_for_iotplatform') + "/api/platform/v1/platformApp/getPlatformAppList/1/10" headers = {'content-type': 'application/json;charset=UTF-8'} r = requests.post(url_platformAppAdd, headers=headers, data=json.dumps(bodyjson)) listObject=json.loads(r.text)['data']['listObject'] if len(listObject) > 0: for obj in listObject: if obj.get("packageName", "").endswith(".zip"): print(f"id: {obj.get('id')}") print(f"name: {obj.get('name')}") print(f"packageName: {obj.get('packageName')}") print(f"packageUrl: {obj.get('packageUrl')}") packageUrl=obj.get('packageUrl') break # 找到第一个满足条件的对象后停止遍历 # 忽略未经核实的SSL证书认证,linux下wget可能会有ssl问题 context = ssl._create_unverified_context() ssl._create_default_https_context = ssl._create_unverified_context print('正在下载上一个版本的zip文件:') download(cdn_url + packageUrl) print(cdn_url + packageUrl) the_last_windowsAppZIP = os.getcwd() + '/' + packageUrl.split("/")[-1] return the_last_windowsAppZIP else: return None def build_for_windowsApp(): print ("开始打包exe流程") os.system('makensis Sources/nbn.nsi') os.system('du -sh Sources/Setup.exe') def calculate_md5(file_path): md5 = hashlib.md5() with open(file_path, 'rb') as f: while True: data = f.read(4096) if not data: break md5.update(data) return md5.hexdigest() def build_the_current_windowsApp(name): the_current_windowsAppZIP="/drone/src/IOTContainer/" + os.getenv('DRONE_REPO_NAME') + '.' + os.getenv('version') + '.zip' the_last_windowsAppZIP = get_the_last_windowsAppZIP(name) if the_last_windowsAppZIP == None: print("初次上传,不比对zip,直接打包上传") platformAppAdd(the_current_windowsAppZIP, '.zip') build_for_windowsApp() platformAppAdd('/drone/src/Sources/Setup.exe', '.exe') else: print ("比对新老版本zip文件") content1 = calculate_md5(the_last_windowsAppZIP) content2 = calculate_md5(the_current_windowsAppZIP) if content1 != content2: print ("比对结果:程序版本已更新") os.system('mkdir -p /tmp/a/ /tmp/b/ /tmp/IOTContainer/') os.system('unzip ' + the_last_windowsAppZIP + ' -d /tmp/a/') os.system('unzip ' + the_current_windowsAppZIP + ' -d /tmp/b/') os.system('/bin/sh -c /drone/src/zip_diff.sh') diff_zip='/tmp/IOTContainer/' + os.getenv('DRONE_REPO_NAME') + '.' + os.getenv('version') + '.zip' platformAppAdd(diff_zip, '.zip') build_for_windowsApp() platformAppAdd('/drone/src/Sources/Setup.exe', '.exe') else: print ("\033[34m比对结果: 程序版本无更新,流程终止\033[0m") if os.getenv("isiot") != 'False': name=os.getenv('name') cdn_url = os.getenv('cdn_url') build_the_current_windowsApp(name) else: print ("is not iot")