本文最后更新于 2024年5月9日 凌晨
自用笔记
常规 字符串 字符串截取 截取ccc
1 2 str = "aaa/bbb/ccc" ;print ( str [ str .rindex( '/' ) + 1 : len ( str ) ] );
判断为空
如果 message 值经过 strip() 方法处理后为空字符串、纯空格字符串或者 None,那么条件判断表达式的值为 True,否则为 False。
移除字符 取出回车换行空格
1 2 3 a = a .replace ('\n' , '' )a = a .replace ('\r' , '' )a = a .replace (' ' , '' )
找子串 find,如果包含子字符串返回开始的索引值,否则返回-1。
1 2 info = 'abca' print info.find ('a' ,0)##从下标0开始
index 找不到会报异常
1 2 info = 'abca' print info.index('a' )
判断后缀 1 2 3 path = "test.py" bool = path.endswith(".py" ) print(bool)
截取后缀 1 2 3 4 5 6 imgUrl = 'https://modelscope.oss-cn-beijing.aliyuncs.com/test/videos/action_detection_test_video.mp4' videoEndswith = os.path .splitext (imgUrl)[-1] if videoEndswith == "" : print ("无后缀" )else : print (videoEndswith)
截取前缀 1 os.path .splitext (file)[0]
截取链接文件名 1 2 3 4 5 from pathlib import Path import urllib file = "https://url.com/file.txt?auth" file = Path (urllib.parse .unquote (file).split ('?' )[0] ).name print (file)
格式化 1 2 3 print ("网站名:{name}, 地址 {url}" .format(name ="fff" , url ="fff.com" ))"{1} {0} {1}" .format("hello" , "world" ) # 设置指定位置'world hello world'
提取数字 1 print (re.findall(r"\d+\.?\d*" ,temp))
bytes转str 1 2 3 4 5 6 7 8 mac1 = b'\x11\x22\x33\x44\x55\x66' mac2 = binascii.hexlify (mac1) mac3 = str (mac2, encoding = "utf-8" ) mac4 = str (mac1, encoding = "utf-8" )print (type(mac1) ,": " ,mac1)print (type(mac2) ,": " ,mac2)print (type(mac3) ,": " ,mac3)print (type(mac4) ,": " ,mac4)
输出
1 2 3 4 <class 'bytes '> : b '\x11 "3DUf ' <class 'bytes '> : b '112233445566 ' <class 'str '> : 112233445566 <class 'str '> : "3DUf
路径 获取当前文件路径
1 2 current_path = os.path.abspath(__file__ ) J:\code\kuon\brain\__init__ .py
取当前文件的父目录
1 father_path = os .path .abspath(os .path .dirname(current_path) + os .path .sep + "." )
打包exe后上面获取地址就不对了,需要使用下面方式
1 2 #获取当前目录路径 current_path = os .path .dirname(os .path .realpath(sys.executable))
1 2 # 获取当前上一级目录路径 father_path = os .path .dirname(os .path .dirname(os .path .realpath(sys.executable)))
但是上面的方式在不打包时不行,所有还有种方式
1 2 获取当前目录路径 path = os.path .dirname (os.path .realpath (sys.argv [0] ))
最后代码中地址都不要采用./XXX的方式
循环 for 示例:
1 2 3 L = [1 , 2 , 3 , 4 , 5 , 6 ]k = [x+10 for x in L if x%2 !=0 ]
for后的if用于排除不符合条件,for前是用于保存的值
判断 in 判断in 右侧的内容里,是否包含了左侧的内容。 包含返回真,不包含返回假。 not in 右侧的内容里是否不包含左侧的内容。不包含返回真,包含返回假。
判断元素是否在集合中
1 if inputText in self .textAList:
判断类型 1 2 3 4 5 6 isinstance (1 , int )True isinstance (1.0 , float )True isinstance (1 ,(int ,float ))True
any和all 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 recRet=[] a={"x" :1 ,"y" :1 } b={"x" :1 ,"y" :1 } c={"x" :1 ,"y" :-1 } recRet.append (a) recRet.append (b) recRet.append (c)if all(member ["x" ] > 0 for member in recRet): print ("All members in recRet have x greater than 0" ) if any(member ["y" ] < 0 for member in recRet): print ("At least one member in recRet has y smaller than 0" ) print (123 )else : print ("All members in recRet have y greater or equal to 0" )
上述代码将两个条件分别放在不同的if语句中。第一个if语句检查所有成员的x值是否大于0,并输出”All members in recRet have x greater than 0”,若条件不满足则不进行任何输出。
第二个if语句使用any()函数来判断是否存在至少一个成员的y值小于0,若条件成立,则分别输出”At least one member in recRet has y smaller than 0”和123;否则,输出”All members in recRet have y greater or equal to 0”。
随机数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import random random .random ()random .randint(1 ,6 )random .uniform(1 ,6 ) List = [1 ,2 ,3 ,4 ,5 ,6 ]random .choice(List)from random import choice l = [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ] print(choice(l)) chooseNums = sample(images,1 )[0 ]
删除文件 1 2 for file in glob .glob ("./file/*" ): os.remove (file )
数据结构 队列 queue
1 2 3 4 5 6 7 import queue self.msgQueue = queue.Queue () self.msgQueue .put (txt) txt = self.msgQueue .get ()while (self.msgQueue.empty() == False): txt = msgQueue.get ()print (msgQueue.qsize() )
json 字典转json字符串 1 2 3 4 5 impor jsona = {"name" :"张三" ,"age" :18 } # type (a) 是dictb = json.dumps(a) # type (b) 是str
字符串转字典 1 2 3 4 t = "{\" name\" :\" 张三\" ,\" age\" :18}" resp = json.loads(t)print (type (resp))print (resp)
easydict 可以使得以属性的方式去访问字典的值
1 2 3 4 5 6 >>> from easydict import EasyDict as edict>>> d = edict({'foo' :3 , 'bar' :{'x' :1 , 'y' :2 }}) >>> d.foo 3>>> d.bar.x 1
线程 退出所以线程
定时器 1 2 3 4 5 6 7 8 9 10 11 12 import threading def thread_Timer (): print ("11111111111" ) global t1 t1 = threading.Timer (5 ,thread_Timer) t1.start () if __name__ == "__main__" : t1 = threading.Timer (5 , thread_Timer) t1.start () t1.cancel ()
threading 线程 相关函数和属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 threading . active_count ( ) :返回当前存活的Thread 对象数量。threading . current_thread ( ) :返回当前线程的Thread 对象。threading . enumerate ( ) :列表形式返回所有存活的Thread 对象。threading . main_thread ( ) :返回主Thread 对象。Thread 对象的方法及属性:Thread . name :线程的名字,没有语义,可以相同名称。Thread . ident :线程标识符,非零整数。Thread . Daemon :是否为守护线程。Thread . is_alive ( ) :是否存活。Thread . start ( ) :开始线程活动。若多次调用抛出RuntimeError 。Thread . run ( ) :用来重载的,Thread . join ( timeout = None ) :等待直到线程正常或异常结束。尚未开始抛出RuntimeError Thread ( group = None , target = None , name = None , args = ( ) , kwargs = { } , *, deamon = None ) :构造函数
函数建立 1 2 3 4 5 6 7 8 9 10 11 import threadingimport time def show (arg): time .sleep(1 ) print('thread ' +str(arg)+" running....")if __name__ == '__main__' : for i in range(10 ): t = threading.Thread(target=show , args=(i,)) t.start ()
查看线程数量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import threading import time def show (): for i in range (3 ): print ("lalalalla{}" .format (i)) time.sleep (1 ) if __name__ == '__main__' : t1 = threading.Thread (target=show) t2 = threading.Thread (target=show) t3 = threading.Thread (target=show) t1.start () t2.start () t3.start () while True: nowThread = len (threading.enumerate ()) print ("当前线程数量:{}" .format (nowThread)) if (nowThread <= 1 ): break time.sleep (0.5 )
线程类 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import threadingimport timeimport requestsclass netTask (threading.Thread): def __init__ (self ): threading.Thread.__init__(self) self.flag =False self.closeFlag = True self.taskStatus = "" def run (self ): while 1 : if (self.flag): time.sleep(10 ) self.taskStatus = "暂停" if (self.closeFlag == False ): break time.sleep(1 ) def go (self ): if (self.flag == False ): self.flag = True def suspend (self ): self.flag = False self.taskStatus = "暂停" def close (self ): self.closeFlag = False self.taskStatus = "退出" def getStatus (self ): temp = self.taskStatus self.taskStatus = "" return temp
初始化和启动
1 2 3 self.nTask = netTask.netTask () self.nTask .start () self.nTask .go ()
主线程退出是要关闭子线程,可以实现closeEvent
1 2 def closeEvent (self ,event ): self .nTask.close()
APScheduler 非阻塞式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from datetime import datetime import time import osfrom apscheduler.schedulers.background import BackgroundScheduler def tick(): print ('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__' : scheduler = BackgroundScheduler() scheduler.add_job(tick, 'interval' , seconds =3) #间隔3秒钟执行一次 scheduler.add_job(tick, 'date' , run_date ='2022-07-12 09:22' )#指定时间执行一次 #scheduler.add_job(tick, 'interval' , )#指定时间执行一次 #scheduler.add_job(tick,'cron' ,day_of_week ='6' ) #每周六执行一次 scheduler.start() try: while True : time.sleep(2) #其他任务是独立的线程执行 print ('sleep!' ) except (KeyboardInterrupt, SystemExit): scheduler.shutdown() print ('Exit The Job!' )
阻塞式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from datetime import datetimeimport osfrom apscheduler.schedulers.blocking import BlockingScheduler def tick (): print ('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__' : scheduler = BlockingScheduler() scheduler.add_job(tick, 'interval' , seconds=3 ) print ('Press Ctrl+{0} to exit' .format ('Break' if os.name == 'nt' else 'C' )) try : scheduler.start() except (KeyboardInterrupt, SystemExit): scheduler.shutdown() print ('Exit The Job!' )
暂停和恢复
1 2 scheduler.pause scheduler.resume
打包exe会报错,可以改为
1 2 3 4 5 6 7 from apscheduler.schedulers .background import BackgroundScheduler from apscheduler.triggers .interval import IntervalTrigger scheduler = BackgroundScheduler () trigger=IntervalTrigger (seconds=1 ) self.scheduler .add_job (self.uploadClientStatus , 'interval' , seconds=60 ,trigger=trigger) scheduler.start ()
这东西用着是方便,就是打包麻烦
进程 multiprocessin
网络 http请求 get请求
设置超时时间,timeout=(2, 3) ,2表示连接超时,3表示读取超时1 2 param = {"id" :"123" ,"status" :"idle" }response = requests.get(url,param,timeout=(2 , 3 ))
如果发生403,可能是需要加个头1 2 headers = {'User-Agent': 'Mozilla/5 .0 (Macintosh; Intel Mac OS X 10 _11_5) AppleWebKit/537 .36 (KHTML, like Gecko) Chrome/50 .0 .2661 .102 Safari/537 .36 '}result = requests.get(url, headers=headers)
post请求 参数分为data和json
1 2 3 param = {"taskNo" :taskNo,"errorMessage" :errorMessage,"canPrint" :canPrint} try :r = requests.post(url,json=param )
form-data上传图片
1 2 3 4 5 6 7 8 9 10 11 12 13 import requests from requests_toolbelt.multipart .encoder import MultipartEncoder url = "http://47.108.53.108:2077/api/file/upload" file = open ('D:\\code\\face\\my\\FaceLib\\independent\\out\\test.jpg' , 'rb' ) multipart_encoder = MultipartEncoder ( fields={ "file" :("test.jpg" ,file,"image/jpg" ) }, ) h={} h['Content-Type' ] =multipart_encoder.content_type res = requests.post (url,data=multipart_encoder,headers=h) print (res.text)
mqtt mqtt服务器可以使用开源的emqx ,18083登录后台,默认账号admin,密码public python的mqtt包使用paho-mqtt
通常使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 pip install paho-mqttclass mqttSample : def __init__ (self,ip,port,user="" ,password="" ): self.client = mqtt.Client() self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.on_disconnect = self.on_disconnect self.client.on_publish = self.on_publish self.client.on_subscribe = self.on_subscribe self.client.on_unsubscribe = self.on_unsubscribe if (user != "" ): self.client.username_pw_set(user, password) self.client.reconnect_delay_set(min_delay=1 , max_delay=120 ) self.client.connect(ip, port, 60 ) self.client.loop_start() self.client.subscribe("test" ) self.client.publish("test" , "Hello World!" ) time.sleep(2 ) self.client.loop_stop() def on_connect (self,client, userdata, flags, rc ): print ("Connected with result code " +str (rc)) def on_disconnect (client, userdata, rc ): print ("Disconnected with result code " +str (rc)) def on_message (self,client, userdata, message ): print ("Received message '" + str (message.payload) + "' on topic '" + message.topic + "' with QoS " + str (message.qos)) def on_publish (self,client, userdata, mid ): print ("mid: " +str (mid)) def on_subscribe (self,client, userdata, mid, granted_qos ): print ("Subscribed: " +str (mid)+" " +str (granted_qos)) def on_unsubscribe (self,client, userdata, mid ): print ("Unsubscribed: " +str (mid)) def disconnect (self ): self.client.disconnect() print ("Disconnected." ) def publish (self,topic,payload ): self.client.publish(topic, payload, qos=0 , retain=False ) def subscribe (self,topic ): self.client.subscribe(topic, qos=0 ) def unsubscribe (self,topic ): self.client.unsubscribe(topic)
一次性方式直接发布消息:
1 2 3 single(topic, payload =None, qos =0, retain =False , hostname ="localhost" , port =1883, client_id ="" , keepalive =60, will =None, auth =None, tls =None, protocol =mqtt.MQTTv311, transport ="tcp" )
will :一个包含客户端遗嘱参数的字典,will = {‘topic’: “”, ‘payload’:”<payload”>, ‘qos’:, ‘retain’:}.
auth :一个包含客户端验证参数的字典,auth = {‘username’:””, ‘password’:””}
tls :一个包含客户端的TLS配置参数的字典,dict = {‘ca_certs’:””, ‘certfile’:””, ‘keyfile’:””, ‘tls_version’:””, ‘ciphers’:”<ciphers”>}
示例:
1 2 import paho.mqtt.publish as publishpublish .single("single" , "payload" , hostname="192.168.2.1" ,port = 1883)
一次性发送多条
1 2 multiple(msgs, hostname ="localhost" , port =1883, client_id ="" , keepalive =60, will =None, auth =None, tls =None, protocol =mqtt.MQTTv311, transport ="tcp" )
msg = {‘topic’:””, ‘payload’:””, ‘qos’:, ‘retain’:}或(“”, “”, qos, retain)
阻塞等待消息
1 2 3 simple(topics, qos =0, msg_count =1, retained =False , hostname ="localhost" , port =1883, client_id ="" , keepalive =60, will =None, auth =None, tls =None, protocol =mqtt.MQTTv311)
msg_count 从代理检索的消息数量。 默认为1.如果为1,则返回一个MQTTMessage对象。 如果> 1,则返回MQTTMessages列表
keepalive 客户端的存活超时值。 默认为60秒。
回调方式接收
1 2 3 callback(callback, topics, qos =0, userdata =None, hostname ="localhost" , port =1883, client_id ="" , keepalive =60, will =None, auth =None, tls =None, protocol =mqtt.MQTTv311)
示例:
1 2 3 4 import paho.mqtt.subscribe as subscribe def on_message_print (client, userdata, message ): print ("%s %s" % (message .topic, message .payload)) subscribe.callback(on_message_print, "callback" , hostname="192.168.2.1" )
下载文件名转码 在请求包头中包含了下载文件的名称,默认编码下中文会无法显示,需要先编再转
1 2 3 4 5 6 7 8 import requests url = "https://url.com/bdc7bfd79016aac017e9bc369a771c36.docx" f = requests.get (url) print(f.headers.get ('Content-Disposition' ).encode('ISO-8859-1' ).decode('utf-8' ))temp = f.headers.get ('Content-Disposition' ).encode('ISO-8859-1' ).decode('utf-8' )name = temp [ temp .rindex('=' ) + 1 : len( temp ) ] print(name )
文件下载 1 2 3 4 5 f = requests.get (fileUrl) temp = f.headers .get ('Content-Disposition' ).encode ('ISO-8859-1' ).decode ('utf-8' ) name = temp[ temp.rindex('=' ) + 1 : len( temp ) ] with open ('./file/' +name , "wb" ) as code :code .write (f.content )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 if (os.path.exists(self.basePath + "/file/" + name) == False ): self.showMessage("文件下载中" ) logger.info("文件下载中" ) try : headers = {'content-type' : "application/octet-stream" ,} f = requests.get(fileUrl,headers = headers) except : logger.error("文件下载失败" ) return with open (self.basePath + '/file/' +fileName , "wb" ) as code: code.write(f.content) code.close()
数据库 sqlite 自带不需安装,直接引入
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 path = os.path.dirname(os.path.realpath(sys.argv[0 ])) dbName = path + "\\" +'Animation.db' def insertAnimationInfo (title,url1,url2,size,time ): conn = sqlite3.connect(dbName) cur = conn.cursor() insert_sql = """insert into animationInfo (title,url1,url2,size,time) values(?,?,?,?,?);""" para = (title,url1,url2,size,time) cur.execute(insert_sql, para) conn.commit() conn.close()def selectAnimationInfo (title ): conn = sqlite3.connect(dbName) cur = conn.cursor() sql = """SELECT title from animationInfo WHERE title='{}' """ .format (title) cur.execute(sql) res = cur.fetchall() conn.close() if (res == []): return False else : return True def selectTable (tableName ): conn=sqlite3.connect(dbName) cursor = conn.execute("SELECT * from " + tableName) res = [] for row in cursor: res.append({"name" :row[0 ],"key" :row[1 ],"week" :row[2 ]}) conn.close() return res
其他 版本对比 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import platformimport pkg_resources as pkgdef check_python (minimum='3.8.0' ): re = check_version(platform.python_version(), minimum, name='Python ' ) return redef check_version (current='0.0.0' , minimum='0.0.0' , name='version ' , pinned=False ): current, minimum = (pkg.parse_version(x) for x in (current, minimum)) result = (current == minimum) if pinned else (current >= minimum) if (result == False ): s = f'WARNING ⚠️ {name} {minimum} is required by YOLOv5, but {name} {current} is currently installed' print (s) return resultprint (check_python())
进度条 1 2 3 4 import time from tqdm import tqdmfor i in tqdm(range(100 )): time .sleep(0.05 )
pyyaml 示例一:
1 2 3 4 5 6 import yamlif __name__ == "__main__" : with open ("./config.yaml" ) as config: cfg = yaml.safe_load(config) print (cfg["mqtt" ])
示例二:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import yaml def read_yaml(yaml_path : Union[str , Path]) -> Dict: if not Path(yaml_path ) .exists() : raise FileExistsError(f 'The {yaml_path } does not exist .') with open (str(yaml_path), 'rb') as f: data = yaml.load(f, Loader=yaml.Loader) return data xx.yaml: A: x: 1 y: 2 B: x: 1 y: 2
生成requirements
打包 安装anaconda,打开工作台
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 conda create -n test python =3.7 conda activate test conda config --env --set subdir win-32set CONDA_FORCE_32BIT =1 切回set CONDA_FORCE_32BIT =0 conda info pip install -r requirements.txt pip install pyinstaller pyinstaller -F .\main.py --noconsole
对配置文件进行打包 main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 import sysimport os base_dir = getattr (sys, '_MEIPASS' , os.path.dirname(os.path.abspath(__file__))) config_path = os.path.join(base_dir, 'config.yaml' )with open (config_path, 'rb' ) as f: pass
打包命令
1 pyinstaller -F --add-data "config.yaml;." .\decrypt_chrome_passwords_obfx.py --noconsole
对图片打包 先将图片读取方式进行修改
1 2 3 4 5 6 def get_resource_path (relative_path ): if hasattr (sys, '_MEIPASS' ): return os.path.join(sys._MEIPASS, relative_path) return os.path.join(os.path.abspath("." ), relative_path) self.setWindowIcon(QIcon(self.get_resource_path("file/favicon-32x32.png" )))
打包命令 –add-data “file:file” 意识是将file文件夹里的文件都打包,打包后的目录也是file。
1 pyinstaller --onefile --add-data="file:file" -wF -i file/favicon-32x32.png -n "NCM转换器" .\gui.py
时间 获取今天是星期几
1 2 3 4 from datetime import datetime week_list = ["星期一" ,"星期二" ,"星期三" ,"星期四" ,"星期五" ,"星期六" ,"星期日" ] dayOfWeek = datetime.now ().weekday ()print (week_list[dayOfWeek])
获取年月日时分秒
1 time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime())
年月日转时间戳
1 2 3 4 5 6 7 8 9 import time dt = "2016-05-05 20:28:54" #转换成时间数组 timeArray = time .strptime (dt , "%Y-%m-%d %H:%M:%S" ) #转换成时间戳 timestamp = time .mktime (timeArray)print (type(timestamp) )print (timestamp) print (int(timestamp) )
计算代码运行时间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import time start_time = time.perf_counter()for i in range (1000000 ): pass end_time = time.perf_counter() elapsed_time = end_time - start_timeprint (f"代码执行时间为 {elapsed_time:.6 f} 秒" )
文件 打开文件夹窗口 1 2 3 import os path = r'D:\code\ggggg\spider-animate\dist' os.startfile(path)
创建文件夹 1 2 3 4 5 6 7 8 9 10 11 12 import os path = "./file2" isExists = os .path .exists(path ) # 判断结果if not isExists: # 如果不存在则创建目录 # 创建目录操作函数 os .makedirs(path ) print (path + ' 创建成功' )else : # 如果目录存在则不创建,并提示目录已存在 print (path + ' 目录已存在' )
判断文件是否存在 1 2 3 4 5 6 7 8 # 判断的是文件 import os if (os .path .isfile('test.1' ) ==False): print ("文件不存在" ) # 判断的是文件夹os .path .isdir(path1) # 两者均可os .path .exists(path1)
循环文件夹,判断路径文件是否存在 1 2 3 for subfolder in tqdm(os .listdir(self .iDataPath)): if not os .path .isdir(os .path .join(self .oDataPath, subfolder)): os .mkdir(os .path .join(self .oDataPath, subfolder))
文件夹下所有文件 1 2 files = os.listdir(path) for file in files :
递归遍历 1 2 3 4 5 6 7 8 def getallfilesofwalk (dir ): if not os.path.isdir(dir ): print (dir ) return dirlist = os.walk(dir ) for root, dirs, files in dirlist: for file in files: print (os.path.join(root, file))
文件操作 复制
1 shutil.copytree(os.path.join (lib,group ),os.path.join (targetLibPath,group ))
删除
1 os.unlink("C:\\ b\\ 1.txt" )
删除文件夹
1 2 3 4 try : os.rmdir("C:\\b\\new_a" )except Exception as ex: print ("错误信息:" +str (ex))
删除文件夹及其内容
1 shutil.rmtree("C:\\ b\\ new_a" )
移动文件
1 shutil.move("C:\\ a\\ 1.txt" , "C:\\ b" )
移动文件夹
1 shutil.move("C:\\ a\\ c" , "C:\\ b" )
重命名文件
1 shutil.move("C:\\ a\\ 2.txt" , "C:\\ a\\ new2.txt" )
重命名文件夹
1 shutil.move("C:\\ a\\ d" ,"C:\\ a\\ new_d" )
打开文件写入 打开
1 2 3 4 5 6 7 8 9 10 #open(file , mode ='r' , encoding) # file 要操作的文件名字, 类型是str # mode , 文件打开方式,只读打开r(read )、只写打开w (write )、追加打开a (append ) # encoding, 文件的编码格式, 常见的编码格式有两种,一种是gbk, 一种是utf-8 # 返回操作句柄f = open ('1.txt' , 'r' ) f = open ('1.txt' , 'a' , encoding='utf-8' ) #不需要关闭 with open ('1.txt' , "wb" ) as code: xxxx
写入
1 f.write ('hello world\n' )
读出
逐行读取
1 2 for line in open ("foo.txt" ): print line ,
读写JSON 1 2 3 4 5 6 7 8 9 10 11 from json import load , dump def settinRead(): with open (projectPath + '/config/animate.json' ,encoding= 'utf-8' ) as file : data = load (file ) return data def settinSet(data): with open (projectPath + '/config/animate.json' , 'w' ,encoding='utf-8' ) as file : dump(data, file ,ensure_ascii=False,indent = 4 )
图库 新增
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 def inputImgLib (group,name,url ): downLoadPath = "./download/" imglibPath = "./imgLib/" if not os.path.isdir(downLoadPath): os.mkdir(downLoadPath) if not os.path.isdir(imglibPath): os.mkdir(imglibPath) if ( (url.endswith(".jpg" )==False and url.endswith(".png" )==False )): print ("url后缀错误" ) return False fileName = url[ url.rindex( '/' ) + 1 : len ( url ) ] if (os.path.isfile(downLoadPath + fileName)): print ("文件已经被下载" ) return False if (download(url, downLoadPath, fileName) == False ): return False groupPath = imglibPath + group if not os.path.isdir(groupPath): os.mkdir(groupPath) print ("新建分组{}" .format (groupPath)) classPath = groupPath + "/" + name if not os.path.isdir(classPath): os.mkdir(classPath) print ("新建分类{}" .format (classPath)) files= os.listdir(classPath) num = 0 for file in files: num = num +1 shutil.move(downLoadPath + fileName, classPath + "/" + str (num + 1 ) + ".jpg" ) print ("添加图片{},{}" .format (classPath,num +1 )) return True
读取和存储list 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def listSaveOrRead(path,mode ="read" ,arg=None): if (mode == "save" ): if (arg == None): return False file = open (path, 'w' ) file .write (str(arg)) file .close () return True elif(mode == "read" ): file = open (path,'r' ) rdlist = eval (file .read ()) file .close () return rdlist test = ['fff' ,1 ,1.1 ] listSaveOrRead(path = "facelib.txt" ,mode ="save" ,arg = test) readText = listSaveOrRead(path='facelib.txt' )print (readText)print (type (readText))
执行时传入参数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import argparse def main(args): parser = argparse.ArgumentParser(description ='Process some integers .') parser.add_argument('-m ','--model ', required =True ,choices =['find ', 'update ']) parser.add_argument('-i ','--img ', default ='',help ="input face image path" ) parser.add_argument('-o ','--outImg ', default ='',help ="output retinaface image path" ) args = parser.parse_args() if (args.model == "input" ): imgPath = args.img imgName = args.imgNameif __name__ == "__main__" : main(sys.argv[1 :] )
异常 1 2 3 4 try : pass except Exception as e: print (e)
用MD5判断重复 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def getmd5(file ): if not os.path.isfile(file ): return fd = open (file ,'rb' ) md5 = hashlib.md5() md5.update(fd.read ()) fd.close () return md5.hexdigest() def compare_images(path_one, path_two): a = getmd5(path_one) b = getmd5(path_two) if (a != b): return True else : return False
防止重复打开 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def winOpen (sys ): app= QtWidgets.QApplication(sys.argv) lockFile = QLockFile("./webprint.app.lock" ) if lockFile.tryLock(2000 ): base = os.path.dirname(os.path.realpath(sys.argv[0 ])) file = open (base + '/assets/qss/style.qss' ,"r" , encoding="utf-8" ) qss = file.read().replace("$DataPath" ,"." ) app.setStyleSheet(qss) app.setQuitOnLastWindowClosed(False ) win = wincore() win.show() sys.exit(app.exec_()) else : msg_box = QMessageBox() msg_box.setWindowTitle("重复开启" ) msg_box.setText("请不要重复开启本软件,注意托盘区域" ) msg_box.setIcon(QMessageBox.Information) msg_box.addButton("确定" , QMessageBox.YesRole) msg_box.exec () sys.exit(-1 )
通过保存进程的识别信息进行区分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def checkMultiOpen (self ): '''检查多开''' def isMultiOpen (): '''主进程多开时返回T''' def getProcessKey (pid ): try : return str (psutil.Process(pid).create_time()) except psutil.NoSuchProcess as e: return '' lastPID = self.get('processID' ) lastKey = self.get('processKey' ) if psutil.pid_exists(lastPID): runningKey = getProcessKey(lastPID) if lastKey == runningKey: print ("多开" ) return nowPid = os.getpid() nowKey = getProcessKey(nowPid) self.set ('processID' , nowPid) self.set ('processKey' , nowKey, isSave=True )
UI pyqt 将UI文件转为py,调用界面文件 转化
示例一 如果是
1 2 class Ui_MainWindow (object ): def setupUi (self, MainWindow ):
则使用下列方式打开
1 2 3 4 5 6 7 8 9 10 11 class MyMainWindow(QMainWindow) : def __init__(self ) : super() .__init__() self.ui = Ui_MainWindow() self.ui.setupUi(self ) if __name__ == '__main__': app = QApplication(sys .argv ) window = MyMainWindow() window.show() sys.exit(app.exec_() )
示例二 如果是:
1 2 class Ui_Form (object ): def setupUi (self, Form ):
则使用下列方式打开
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import sysfrom PyQt5 .QtWidgets import QApplication, QWidgetfrom autoUpdateUi import Ui_Formclass MyApp (QWidget ): def __init__(self ): super().__init__() self.ui = Ui_Form () self.ui.setupUi(self ) if __name__ == '__main__': app = QApplication (sys .argv ) myapp = MyApp () myapp.show() sys.exit(app .exec_ ())
底部消息栏 1 2 3 self.statusBar=QStatusBar() self.setStatusBar(self .statusBar ) self.statusBar.showMessage('lalallala -------',5000)
设置窗体 名称
1 self .setWindowTitle('123' )
禁止修改大小
1 self .set FixedSize(self .width(), self .height());
绑定和设置 普通按键
1 self.pushButton .clicked .connect (self.buttonStop)
菜单栏按键
1 self.action_4 .triggered .connect (self.openConfig)
选择框
1 self.comboBox .currentIndexChanged .connect (self.comboxChange_event)
按键使能失能
1 2 self .pushButton.setEnabled(False )self .pushButton.setEnabled(True )
定时器 1 2 3 self.timer3 = QTimer () self.timer3 .timeout .connect (self.updateStatus) self.timer3 .start (1000 )
文本框 设置文本
1 self .label .set Text("ID: " +self .user Id)
设置图片
1 2 from PyQt5.QtGui import QPixmap label3.setPixmap (QPixmap ("1.png" ))
获取文本
1 2 3 theme = self.lineEdit_theme.text()cb_txt = self.comboBox.currentText()
文本框,显示并且跳到文末
1 2 self.textBrowser.insertPlainText( time .strftime ("%Y-%m-%d %H:%M:%S" , time .localtime () )+" => " + txt + "\n" ) self.textBrowser.moveCursor(self .textBrowser .textCursor () .End)
清空文本框
1 2 self.textBrowser .clear () self.textBrowser .setText ("" )
文字颜色和大小
1 self.textBrowser.append("<font color ='red' size =36 > 123123 <font > ")
限制输入
1 2 from PyQt5.QtCore import QRegExpfrom PyQt5.QtGui import QRegExpValidator, QIntValidator, QDoubleValidator
1 2 3 4 5 6 7 8 9 10 11 12 #设置浮点数限制 范围0 ~86400 ,小数点1 位 LE2Validator = QDoubleValidator(self ) LE2Validator . setRange(0, 86400) LE2Validator . setNotation(QDoubleValidator.StandardNotation) LE2Validator . setDecimals(1) self.lineEdit_2.setValidator(LE2Validator) # 限制为数字和字符 reg = QRegExp("[a-zA-Z0-9]+$" ) LE1Validator = QRegExpValidator(self ) LE1Validator . setRegExp(reg ) self.lineEdit.setValidator(LE1Validator)
右键菜单 1 2 3 4 5 6 7 8 9 10 11 self.textBrowser.setContextMenuPolicy(Qt.CustomContextMenu) self.textBrowser.customContextMenuRequested.connect(self.showMenu) self.contextMenu = QMenu(self) self.clearBrowser = self.contextMenu.addAction('清空显示' )、 self.clearBrowser.triggered.connect(self.Event)def showMenu (self,pos ): self.contextMenu.exec_(QCursor.pos()) def Event (self ): QMessageBox.information(self, "提示:" , ' 您选择了' + self.sender().text())
子窗口 首先用designer画一个子窗体,文件名为configUi,窗体名为Ui_DialogConfig 引入
1 from configUi import Ui_DialogConfig
窗体的类,和其他窗体是一样的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class ConfigWin (QtWidgets.QDialog, Ui_DialogConfig): def __init__ (self ): super (ConfigWin, self).__init__() self.setupUi(self) self.init() def init (self ): self.setWindowTitle('配置' ) self.lineEdit.setText(self.cfg.userId) self.lineEdit_2.setText(self.cfg.phone) self.pushButton.clicked.connect(self.ipScan) def ipScan (self ): pass
在主窗体的一个按键事件中打开此窗体
1 2 3 4 5 6 7 8 9 10 11 12 def setConfig (self): cfgWin = ConfigWin () cfgWin.show () def quit (): print ("quit" ) cfgWin.close () def save (): print ("save" ) cfgWin.close () cfgWin.pushButton_5.clicked.connect (quit) cfgWin.pushButton_4.clicked.connect (save) cfgWin.exec_ ()
弹窗 1 2 3 4 5 6 7 8 9 10 QMessageBox.information(self,"标题" ,"消息正文" ,QMessageBox.Yes |QMessageBox.No ,QMessageBox.Yes ) QMessageBox.question(self,"标题" ,"问答消息正文" ,QMessageBox.Yes |QMessageBox.No ,QMessageBox.Yes ) QMessageBox.warning (self,"标题" ,"警告消息正文" ,QMessageBox.Yes |QMessageBox.No ,QMessageBox.Yes ) QMessageBox.critical(self,"标题" ,"严重错误消息正文" ,QMessageBox.Yes |QMessageBox.No ,QMessageBox.Yes ) QMessageBox.about(self,"标题" ,"关于消息正文" )
显示图片 1 2 3 4 5 6 pixmap = QPixmap(img)self .label_5.set ScaledContents(True)self .label_5.set Pixmap(pixmap)self .label_5.set Pixmap(QPixmap("" ))
最小化到系统托盘 新增的函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 def setTrayIcon (self ): self.quitAppAction = QtWidgets.QAction("退出" ) self.quitAppAction.triggered.connect(self.quitApp) self.trayIconMenu = QtWidgets.QMenu() self.trayIconMenu.addAction(self.quitAppAction) self.trayIcon = QtWidgets.QSystemTrayIcon() self.trayIcon.setContextMenu(self.trayIconMenu) self.trayIcon.setIcon(QIcon('./logo.png' )) self.trayIcon.setToolTip("云打印客户端软件" ) self.trayIcon.activated[QtWidgets.QSystemTrayIcon.ActivationReason].connect(self.openMainWindow) self.trayIcon.show()def quitApp (self ): checkFlag = QtWidgets.QMessageBox.information(self, "退出确认" , "是否确认退出?" , QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No) if checkFlag == QtWidgets.QMessageBox.Yes: QtWidgets.qApp.quit() else : pass def openMainWindow (self, reason ): if reason == QtWidgets.QSystemTrayIcon.DoubleClick: self.showNormal() self.activateWindow()def closeEvent (self,event ): pass
然后在init中调用setTrayIcon
在主函数中设置 关闭全部窗口后程序不退出
1 2 3 4 5 6 7 8 def winOpen(sys): app= QtWidgets.QApplication(sys.argv) app.setQuitOnLastWindowClosed(False) win = wincore() win.show() sys.exit (app.exec_())
使用内置图标 1 2 value : 1 ~71 btn.setIcon(QApplication.style () .standardIcon(value ) )
图像处理 切图 1 2 3 4 5 6 7 8 img = Image.open('01. jpg') width , height = img.size # 前两个坐标点是左上角坐标 # 后两个坐标点是右下角坐标 # width 在前, height 在后box = (100 , 100 , 550 , 350 )region = img.crop(box ) region .save ('crop.jpg')
拼接 多张图拼接为一张图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import PIL.Image as Imageimport os IMAGES_PATH = 'E:\\数据集\\手写数字\\penbox\\2-6-Q\\' IMAGES_FORMAT = ['.jpg' , '.JPG' ,'.png' ] IMAGE_SIZE = 30 IMAGE_ROW = 3 IMAGE_COLUMN = 10 IMAGE_SAVE_PATH = 'E:\\数据集\\手写数字\\penbox\\2-6-Q.jpg' image_names = [name for name in os.listdir(IMAGES_PATH) for item in IMAGES_FORMAT if os.path.splitext(name)[1 ] == item]print ("{} {}" .format (len (image_names), IMAGE_ROW * IMAGE_COLUMN))if len (image_names) != IMAGE_ROW * IMAGE_COLUMN: print ("{} {}" .format (len (image_names), IMAGE_ROW * IMAGE_COLUMN)) raise ValueError("合成图片的参数和要求的数量不能匹配!{}" ) def image_compose (): to_image = Image.new('RGB' , (IMAGE_COLUMN * IMAGE_SIZE, IMAGE_ROW * IMAGE_SIZE)) for y in range (1 , IMAGE_ROW + 1 ): for x in range (1 , IMAGE_COLUMN + 1 ): from_image = Image.open (IMAGES_PATH + image_names[IMAGE_COLUMN * (y - 1 ) + x - 1 ]).resize( (IMAGE_SIZE, IMAGE_SIZE),Image.ANTIALIAS) to_image.paste(from_image, ((x - 1 ) * IMAGE_SIZE, (y - 1 ) * IMAGE_SIZE)) return to_image.save(IMAGE_SAVE_PATH) image_compose()
代码格式化 black 仓库 安装及其使用,python版本要大于3.7
1 2 3 pip install black black {source_file_or_directory} python -m black {source_file_or_directory}
isort 导入排序 仓库 安装
使用
1 2 isort mypythonfile.py mypythonfile2.py isort .
配置 读写配置 configparser 配置文件
1 2 3 4 5 6 7 [user] id = 123 phone = 1234567890 [network] serverhost = 192.168 .1.2
读取配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import configparser def readConfig (self ): try : logger.info("read config....." ) config = configparser.ConfigParser() config.read('config.ini' ) self.userId = config.get('user' , 'id' ) self.userPhone = config.get('user' ,'phone' ) self.serverhost = config.get('network' ,'serverhost' ) return True except : logger.error("read config error" ) return False
保存配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def saveConfig(self ): try: config = configparser.ConfigParser() filepath = "config.ini" config .read (filepath) config .set('user' , 'id' ,self .lineEdit.text()) config .set('user' ,'phone' ,self .lineEdit_2.text()) config .set('network' ,'serverhost' ,self .lineEdit_3.text()) fh = open (filepath,'w' ) config .write (fh) fh.close () logger.info("save succeed" ) self .statusBar.showMessage('配置存储成功' ,5000 ) except: logger.error ("save config error" ) self .statusBar.showMessage('配置存储失败' ,5000 )
读写配置 env .env文件
1 2 3 4 5 HOST = aaa.appDBNAME = rootPASSWORD = 123123 DATABASE = AAAAPORT = 1234
读取配置
1 2 3 4 5 6 7 from dotenv import find_dotenv, load_dotenv load_dotenv(find_dotenv('.env' ))print (os.environ.get ("HOST" ))print (os.environ.get ("DBNAME" ))print (os.environ.get ("PASSWORD" ))print (os.environ.get ("DATABASE" ))print (os.environ.get ("PORT" ))
配置管理 Hydra 安装:
官方文档 示例:
conf/config.yaml
1 2 3 4 db: driver: mysql user: omry pass: secret
Application:
1 2 3 4 5 6 7 8 9 10 11 12 13 import hydrafrom omegaconf import DictConfig, OmegaConf @hydra.main(version_base =None, config_path ="conf" , config_name ="config" ) def my_app(cfg : DictConfig) -> None: print (OmegaConf.to_yaml(cfg)) print (cfg.db.user) print (cfg['db' ]['user' ]) cfg.db.user = '123' print (cfg['db' ].user) if __name__ == "__main__" : my_app()
另一种方式:
1 2 3 4 import hydra hydra.initialize(config_path ="conf" , job_name ="test_app" ,version_base=None) cfg = hydra.compose(config_name ="config" )print (cfg)
decouple 配置管理 仓库 安装
1 pip install python-decouple
创建配置文件 .env
1 2 3 NAME = fffFLAG = True PORT = 1234
然后读取它
1 2 3 4 5 6 7 8 9 10 from decouple import configname = config('NAME' ) flag = config('FLAG' , default=False , cast =bool ) host = config('HOST' , default='192.168.1.1' ) port = config('PORT' , default=999 , cast =int ) # 使用配置项 print("{} {} || {} {} || {} {} || {} {}".format(name ,type (name ), flag,type (flag), host,type (host), port,type (port)))
结果:
1 fff <class 'str' > || True <class 'bool' > || 192.168 .1 .1 <class 'str' > || 1234 <class 'int' >
配置也可以是多项
然后
1 2 url = config('URL', cast=Csv()) print("{} {}" .format(url ,type(url )))
Csv()默认是字符串,如果是整数则可以传入参数Csv(int)
可以现在配置的参数
1 2 3 4 5 6 7 8 9 10 from decouple import config, Choices os.environ['CONNECTION_TYPE' ] = 'usb' config('CONNECTION_TYPE' , cast=Choices (['eth' , 'usb' , 'bluetooth' ]))'usb' os.environ['CONNECTION_TYPE' ] = 'serial' config('CONNECTION_TYPE' , cast=Choices (['eth' , 'usb' , 'bluetooth' ]))Traceback (most recent call last): ...ValueError : Value not in list: 'serial' ; valid values are ['eth' , 'usb' , 'bluetooth' ]
下载 多线程下载 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from PyQt5.QtCore import QThread, pyqtSignalimport requestsclass DownloaderThread (QThread ): download_completed = pyqtSignal(bool ,str ,str ,dict ) def __init__ (self, url,path,name,id ,cfg ): super ().__init__() self.url = url self.downloadPath = path self.name = name self.id = id self.cfg=cfg self.stop_thread = False def run (self ): try : headers = {'content-type' : "application/octet-stream" ,} response = requests.get(self.url, headers=headers, timeout=10 , stream=True ) block_size = 1024 with open (os.path.join(self.downloadPath,self.name) , "wb" ) as f: for chunk in response.iter_content(block_size): if chunk and not self.stop_thread: f.write(chunk) else : break if not self.stop_thread: res = True else : res = False except Exception as e: res = False self.download_completed.emit(res,self.name,self.id ,self.cfg) def stop (self ): self.stop_thread = True
调用
1 2 3 self.downloader = DownloaderThread (fileUrl,downloadPath,fileName,taskNo,cfg) self.downloader .download_completed .connect (self.downloadCallback) self.downloader .start ()
回调
1 2 3 4 5 6 7 8 9 10 def downloadCallback(self ,res ,name ,id ,cfg ) : if (res == True): self.showMessage("文件下载完成" ) logger.info("文件下载完成" ) self.workerFile(name ,id ,cfg ) else : self.showMessage("文件下载失败" ) logger.error("文件下载失败" ) self.netTask.publishTaskStatus("refuse" ,id ,"文件下载失败" ) self.startWorkFlag = False
包 假设你创建了一个名为 my_package 的 Python 包,并在其中添加了两个模块 module1.py 和 module2.py。现在,如果您想在 my_package 中定义一些变量和函数,并从这些模块中导入并使用这些变量和函数,可以使用 init .py 文件。
首先,在 my_package 目录下创建一个名为 init .py 的空文件。然后,您可以在该文件中声明变量和函数,并从其他模块中导入它们。例如:
1 2 3 4 5 6 7 8 9 10 11 12 MY_VALUE = 12345 def my_function (): pass from .module1 import module1_value, module1_functionfrom .module2 import module2_value, module2_function
在上面的代码中:
首先声明了一个名为 MY_VALUE 的变量和一个名为 my_function() 的函数。
然后,使用相对导入语句从模块 module1 和 module2 中导入变量和函数。
注意,在此处使用点号 . 表示当前包 my_package。这是因为 init .py 文件是在包 my_package 中创建的,因此它也被视为 my_package 的一部分。
最后,可以通过导入 my_package 来访问这些变量和函数
1 2 3 4 5 6 7 8 9 10 # 在其他文件中导入 my_package import my_package # 使用变量和函数print (my_package.MY_VALUE) my_package.my_function () my_package.module1_value my_package.module1_function () my_package.module2_value my_package.module2_function ()
这样就可以在 my_package 中组织和重用代码,并且通过 init .py 文件来导入和使用该包中的变量和函数。