python自用笔记

本文最后更新于:2023年11月20日 上午

自用笔记

常规

字符串

字符串截取

截取ccc

1
2
str = "aaa/bbb/ccc";
print( str[ str.rindex( '/' ) + 1 : len( str ) ] );

判断为空

1
if not message.strip():

如果 message 值经过 strip() 方法处理后为空字符串、纯空格字符串或者 None,那么条件判断表达式的值为 True,否则为 False。

移除字符

取出回车换行空格

1
2
3
a = a.replace('\n', '')
a = a.replace('\r', '')
a = a.replace(' ', '')

找子串

find,如果包含子字符串返回开始的索引值,否则返回-1。

1
2
info = 'abca'
print info.find('a',0)##从下标0开始

index 找不到会报异常

1
2
info = 'abca'
print info.index('a')

判断后缀

1
2
3
path = "test.py"
bool = path.endswith(".py")
print(bool)

截取后缀

1
2
3
4
5
6
imgUrl = 'https://modelscope.oss-cn-beijing.aliyuncs.com/test/videos/action_detection_test_video.mp4'
videoEndswith = os.path.splitext(imgUrl)[-1]
if videoEndswith == "":
print("无后缀")
else:
print(videoEndswith)

截取前缀

1
os.path.splitext(file)[0]

截取链接文件名

1
2
3
4
5
from pathlib import Path
import urllib
file = "https://url.com/file.txt?auth"
file = Path(urllib.parse.unquote(file).split('?')[0]).name
print(file)

格式化

1
2
3
print("网站名:{name}, 地址 {url}".format(name="fff", url="fff.com"))
"{1} {0} {1}".format("hello", "world") # 设置指定位置
'world hello world'

提取数字

1
print (re.findall(r"\d+\.?\d*",temp))

bytes转str

1
2
3
4
5
6
7
8
mac1 = b'\x11\x22\x33\x44\x55\x66'
mac2 = binascii.hexlify(mac1)
mac3 = str(mac2, encoding = "utf-8")
mac4 = str(mac1, encoding = "utf-8")
print(type(mac1),": ",mac1)
print(type(mac2),": ",mac2)
print(type(mac3),": ",mac3)
print(type(mac4),": ",mac4)

输出

1
2
3
4
<class 'bytes'> :  b'\x11"3DUf'
<class 'bytes'> : b'112233445566'
<class 'str'> : 112233445566
<class 'str'> : "3DUf

路径

获取当前文件路径

1
2
current_path = os.path.abspath(__file__)
J:\code\kuon\brain\__init__.py

取当前文件的父目录

1
father_path = os.path.abspath(os.path.dirname(current_path) + os.path.sep + ".")

打包exe后上面获取地址就不对了,需要使用下面方式

1
2
#获取当前目录路径
current_path = os.path.dirname(os.path.realpath(sys.executable))
1
2
# 获取当前上一级目录路径
father_path = os.path.dirname(os.path.dirname(os.path.realpath(sys.executable)))

但是上面的方式在不打包时不行,所有还有种方式

1
2
获取当前目录路径
path = os.path.dirname(os.path.realpath(sys.argv[0]))

最后代码中地址都不要采用./XXX的方式

循环

for

示例:

1
2
3
L = [1, 2, 3, 4, 5, 6]
k = [x+10 for x in L if x%2!=0]
#结果 k = [11, 13, 15]

for后的if用于排除不符合条件,for前是用于保存的值

判断

in

判断in 右侧的内容里,是否包含了左侧的内容。 包含返回真,不包含返回假。
not in 右侧的内容里是否不包含左侧的内容。不包含返回真,包含返回假。

判断元素是否在集合中

1
if inputText in self.textAList:

判断类型

1
2
3
4
5
6
isinstance(1, int)
True
isinstance(1.0, float)
True
isinstance(1,(int,float))
True

any和all

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
recRet=[]
a={"x":1,"y":1}
b={"x":1,"y":1}
c={"x":1,"y":-1}
recRet.append(a)
recRet.append(b)
recRet.append(c)

if all(member["x"] > 0 for member in recRet):
print("All members in recRet have x greater than 0")

if any(member["y"] < 0 for member in recRet):
print("At least one member in recRet has y smaller than 0")
print(123)
else:
print("All members in recRet have y greater or equal to 0")

上述代码将两个条件分别放在不同的if语句中。第一个if语句检查所有成员的x值是否大于0,并输出”All members in recRet have x greater than 0”,若条件不满足则不进行任何输出。

第二个if语句使用any()函数来判断是否存在至少一个成员的y值小于0,若条件成立,则分别输出”At least one member in recRet has y smaller than 0”和123;否则,输出”All members in recRet have y greater or equal to 0”。

随机数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import random
#0~1的浮点
random.random()
#整数
random.randint(1,6)
#指定浮点
random.uniform(1,6)
#特定区域选择一个
List = [1,2,3,4,5,6]
random.choice(List)
#随机从list中取出一个
from random import choice
l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
print(choice(l)) # 随机抽取一个
#随机从list取出多个,且不重复
chooseNums = sample(images,1)[0]

删除文件

1
2
for file in glob.glob("./file/*"):  
os.remove(file)

数据结构

队列

queue

1
2
3
4
5
6
7
import queue
self.msgQueue = queue.Queue()
self.msgQueue.put(txt)
txt = self.msgQueue.get()
while(self.msgQueue.empty() == False):
txt = msgQueue.get()
print(msgQueue.qsize())

json

字典转json字符串

1
2
3
4
5
impor json
a = {"name":"张三","age":18}
# type(a) 是dict
b = json.dumps(a)
# type(b) 是str

字符串转字典

1
2
3
4
t = "{\"name\":\"张三\",\"age\":18}"
resp = json.loads(t)
print(type(resp))
print(resp)

easydict

可以使得以属性的方式去访问字典的值

1
2
3
4
5
6
>>> from easydict import EasyDict as edict
>>> d = edict({'foo':3, 'bar':{'x':1, 'y':2}})
>>> d.foo
3
>>> d.bar.x
1

线程

退出所以线程

1
sys.exit(app.exec_())

定时器

1
2
3
4
5
6
7
8
9
10
11
12
import threading
def thread_Timer():
print("11111111111")
global t1
t1 = threading.Timer(5,thread_Timer)
t1.start()


if __name__ == "__main__":
t1 = threading.Timer(5, thread_Timer)
t1.start()
t1.cancel()

threading 线程

相关函数和属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
threading.active_count():返回当前存活的Thread对象数量。
threading.current_thread():返回当前线程的Thread对象。
threading.enumerate():列表形式返回所有存活的Thread对象。
threading.main_thread():返回主Thread对象。
Thread对象的方法及属性:

Thread.name:线程的名字,没有语义,可以相同名称。
Thread.ident:线程标识符,非零整数。
Thread.Daemon:是否为守护线程。
Thread.is_alive():是否存活。
Thread.start():开始线程活动。若多次调用抛出RuntimeError
Thread.run():用来重载的,
Thread.join(timeout=None):等待直到线程正常或异常结束。尚未开始抛出RuntimeError
Thread(group=None, target=None, name=None, args=(), kwargs={}, *, deamon=None):构造函数

函数建立

1
2
3
4
5
6
7
8
9
10
11
import threading
import time

def show(arg):
time.sleep(1)
print('thread '+str(arg)+" running....")

if __name__ == '__main__':
for i in range(10):
t = threading.Thread(target=show, args=(i,))
t.start()

查看线程数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time

def show():
for i in range(3):
print("lalalalla{}".format(i))
time.sleep(1)

if __name__ == '__main__':
t1 = threading.Thread(target=show)
t2 = threading.Thread(target=show)
t3 = threading.Thread(target=show)
t1.start()
t2.start()
t3.start()
while True:
nowThread = len(threading.enumerate())
print("当前线程数量:{}".format(nowThread))
if(nowThread <= 1):
break
time.sleep(0.5)

线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import threading
import time
import requests

class netTask(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.flag =False #用于启动线程但不执行功能
self.closeFlag = True # 用于停止功能执行,当不终结线程
self.taskStatus = ""

#建立连接->等待数据
def run(self):
#self.taskStatus = "运行中"
while 1:
if(self.flag):
time.sleep(10)
#退出
self.taskStatus = "暂停"
if(self.closeFlag == False):
break
time.sleep(1)

def go(self):
if(self.flag == False):
self.flag = True

def suspend(self):
self.flag = False
self.taskStatus = "暂停"

def close(self):
self.closeFlag = False
self.taskStatus = "退出"

def getStatus(self):
temp = self.taskStatus
self.taskStatus = ""
return temp

初始化和启动

1
2
3
self.nTask = netTask.netTask()
self.nTask.start()
self.nTask.go()

主线程退出是要关闭子线程,可以实现closeEvent

1
2
def closeEvent(self,event):
self.nTask.close()

APScheduler

非阻塞式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from datetime import datetime
import time
import os
from apscheduler.schedulers.background import BackgroundScheduler

def tick():
print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'interval', seconds=3) #间隔3秒钟执行一次
scheduler.add_job(tick, 'date', run_date='2022-07-12 09:22')#指定时间执行一次
#scheduler.add_job(tick, 'interval', )#指定时间执行一次
#scheduler.add_job(tick,'cron',day_of_week='6') #每周六执行一次
scheduler.start()
try:
while True:
time.sleep(2) #其他任务是独立的线程执行
print('sleep!')
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
print('Exit The Job!')

阻塞式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler

def tick():
print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start() #采用的是阻塞的方式,只有一个线程专职做调度的任务
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
print('Exit The Job!')

暂停和恢复

1
2
scheduler.pause()
scheduler.resume()

打包exe会报错,可以改为

1
2
3
4
5
6
7
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger

scheduler = BackgroundScheduler()
trigger=IntervalTrigger(seconds=1)
self.scheduler.add_job(self.uploadClientStatus, 'interval', seconds=60,trigger=trigger)
scheduler.start()

这东西用着是方便,就是打包麻烦

进程

multiprocessin

网络

http请求

get请求

  • 设置超时时间,timeout=(2, 3) ,2表示连接超时,3表示读取超时
    1
    2
    param = {"id":"123","status":"idle"}
    response = requests.get(url,param,timeout=(2, 3))
    如果发生403,可能是需要加个头
    1
    2
    headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'}
    result = requests.get(url, headers=headers)

post请求

参数分为data和json

1
2
3
param = {"taskNo":taskNo,"errorMessage":errorMessage,"canPrint":canPrint}
try:
r = requests.post(url,json=param)

form-data上传图片

1
2
3
4
5
6
7
8
9
10
11
12
13
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
url = "http://47.108.53.108:2077/api/file/upload"
file = open('D:\\code\\face\\my\\FaceLib\\independent\\out\\test.jpg', 'rb')
multipart_encoder = MultipartEncoder(
fields={
"file":("test.jpg",file,"image/jpg")
},
)
h={}
h['Content-Type']=multipart_encoder.content_type
res = requests.post(url,data=multipart_encoder,headers=h)
print (res.text)

mqtt

mqtt服务器可以使用开源的emqx,18083登录后台,默认账号admin,密码public
python的mqtt包使用paho-mqtt

通常使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
pip install paho-mqtt
class mqttSample:
def __init__(self,ip,port,user="",password=""):

# Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
# client_id长度为0则随机生成,此时clean_session参数必须为True
# clean_session 一个决定客户端类型的布尔值。 如果为True,那么代理将在其断开连接时删除有关此客户端的所有信息。 如果为False,则客户端是持久客户端,当客户端断开连接时,订阅信息和排队消息将被保留
# userdata 用户定义的任何类型的数据作为userdata参数传递给回调函数。 它可能会在稍后使用user_data_set()函数进行更新。
# protocol 用于此客户端的MQTT协议的版本。 可以是MQTTv31或MQTTv311。
# transport 设置为“websockets”通过WebSockets发送MQTT。 保留默认的“tcp”使用原始TCP
self.client = mqtt.Client()

self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
self.client.on_publish = self.on_publish
self.client.on_subscribe = self.on_subscribe
self.client.on_unsubscribe = self.on_unsubscribe

if(user != ""):
self.client.username_pw_set(user, password)
# 设置遗嘱
# self.client.will_set("over", payload=None, qos=0, retain=False)
# 设置重连延迟
# 当连接丢失时,最初重新连接尝试延迟min_delay秒。 延迟在随后的尝试到中增加一倍。当连接完成时(例如收到CONNACK,而不仅仅是TCP连接建立),延迟重置为min_delay
self.client.reconnect_delay_set(min_delay=1, max_delay=120)
self.client.connect(ip, port, 60)
self.client.loop_start()
self.client.subscribe("test")
self.client.publish("test", "Hello World!")


time.sleep(2)
self.client.loop_stop()
# 阻塞,直到调用了disconnect
# loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)


#连接回调,rc返回结果
#0连接成功,1协议版本错误,2无效的客户端标识,3服务器无法使用,4错误的用户名或密码,5未经授权
def on_connect(self,client, userdata, flags, rc):
print("Connected with result code "+str(rc))

#连接断开回调 rc参数表示断开状态
def on_disconnect(client, userdata, rc):
print("Disconnected with result code "+str(rc))

#接收到订阅主题的消息
def on_message(self,client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))

#消息发送成功后的回调
#对于Qos级别为1和2的消息,这意味着已经完成了与代理的握手。
#对于Qos级别为0的消息,这只意味着消息离开了客户端。
#mid变量与从相应的publish()返回的mid变量匹配
def on_publish(self,client, userdata, mid):
print("mid: "+str(mid))

#订阅主题后的回调
#mid变量匹配从相应的subscri be()返回的mid变量
def on_subscribe(self,client, userdata, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))

#取消订阅的回调
def on_unsubscribe(self,client, userdata, mid):
print("Unsubscribed: "+str(mid))

def disconnect(self):
self.client.disconnect()
print("Disconnected.")

def publish(self,topic,payload):
self.client.publish(topic, payload, qos=0, retain=False)

#注意订阅要放在连接之后
def subscribe(self,topic):
self.client.subscribe(topic, qos=0)
# subscribe([("my/topic", 0), ("another/topic", 2)])

def unsubscribe(self,topic):
self.client.unsubscribe(topic)

一次性方式直接发布消息:

1
2
3
single(topic, payload=None, qos=0, retain=False, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
protocol=mqtt.MQTTv311, transport="tcp")
  • will :一个包含客户端遗嘱参数的字典,will = {‘topic’: “”, ‘payload’:”<payload”>, ‘qos’:, ‘retain’:}.
  • auth :一个包含客户端验证参数的字典,auth = {‘username’:””, ‘password’:””}
  • tls :一个包含客户端的TLS配置参数的字典,dict = {‘ca_certs’:””, ‘certfile’:””, ‘keyfile’:””, ‘tls_version’:””, ‘ciphers’:”<ciphers”>}

示例:

1
2
import paho.mqtt.publish as publish
publish.single("single", "payload", hostname="192.168.2.1",port = 1883)

一次性发送多条

1
2
multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")
  • msg = {‘topic’:””, ‘payload’:””, ‘qos’:, ‘retain’:}或(“”, “”, qos, retain)

阻塞等待消息

1
2
3
simple(topics, qos=0, msg_count=1, retained=False, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
protocol=mqtt.MQTTv311)
  • msg_count 从代理检索的消息数量。 默认为1.如果为1,则返回一个MQTTMessage对象。 如果> 1,则返回MQTTMessages列表
  • keepalive 客户端的存活超时值。 默认为60秒。

回调方式接收

1
2
3
callback(callback, topics, qos=0, userdata=None, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
protocol=mqtt.MQTTv311)

示例:

1
2
3
4
import paho.mqtt.subscribe as subscribe
def on_message_print(client, userdata, message):
print("%s %s" % (message.topic, message.payload))
subscribe.callback(on_message_print, "callback", hostname="192.168.2.1")

下载文件名转码

在请求包头中包含了下载文件的名称,默认编码下中文会无法显示,需要先编再转

1
2
3
4
5
6
7
8
import requests

url = "https://url.com/bdc7bfd79016aac017e9bc369a771c36.docx"
f = requests.get(url)
print(f.headers.get('Content-Disposition').encode('ISO-8859-1').decode('utf-8'))
temp = f.headers.get('Content-Disposition').encode('ISO-8859-1').decode('utf-8')
name = temp[ temp.rindex('=') + 1 : len( temp ) ]
print(name)

文件下载

1
2
3
4
5
f = requests.get(fileUrl)
temp = f.headers.get('Content-Disposition').encode('ISO-8859-1').decode('utf-8')
name = temp[ temp.rindex('=') + 1 : len( temp ) ]
with open('./file/'+name , "wb") as code:
code.write(f.content)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 判断文件是否需要下载,也即找本地是否有同名文件
if(os.path.exists(self.basePath + "/file/" + name) == False):
self.showMessage("文件下载中")
logger.info("文件下载中")
try:
headers = {'content-type': "application/octet-stream",}
f = requests.get(fileUrl,headers = headers)
except:
logger.error("文件下载失败")
return
with open(self.basePath + '/file/'+fileName , "wb") as code:
code.write(f.content)
code.close()

数据库

sqlite

自带不需安装,直接引入

1
import sqlite3

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
path = os.path.dirname(os.path.realpath(sys.argv[0]))
dbName = path + "\\"+'Animation.db'
# 插入数据
def insertAnimationInfo(title,url1,url2,size,time):
# 存入数据库
conn = sqlite3.connect(dbName)
cur = conn.cursor()
insert_sql = """insert into animationInfo (title,url1,url2,size,time) values(?,?,?,?,?);"""
para = (title,url1,url2,size,time)
cur.execute(insert_sql, para)
conn.commit()
conn.close()

#查询搜索目标是否存在
def selectAnimationInfo(title):
conn = sqlite3.connect(dbName)
cur = conn.cursor()
sql = """SELECT title from animationInfo WHERE title='{}' """.format(title)
cur.execute(sql)
res = cur.fetchall()
conn.close()
if(res == []):
return False
else:
return True

def selectTable(tableName):
conn=sqlite3.connect(dbName)
cursor = conn.execute("SELECT * from " + tableName)
res = []
for row in cursor:
res.append({"name":row[0],"key":row[1],"week":row[2]})
conn.close()
return res

其他

版本对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import platform
import pkg_resources as pkg

def check_python(minimum='3.8.0'):
re = check_version(platform.python_version(), minimum, name='Python ')
return re

# current 当前版本 minimum 最小版本
def check_version(current='0.0.0', minimum='0.0.0', name='version ', pinned=False):
# Check version vs. required version
current, minimum = (pkg.parse_version(x) for x in (current, minimum))
result = (current == minimum) if pinned else (current >= minimum) # bool
if(result == False):
s = f'WARNING ⚠️ {name}{minimum} is required by YOLOv5, but {name}{current} is currently installed' # string
print(s)
return result

print(check_python())

进度条

1
2
3
4
import time
from tqdm import tqdm
for i in tqdm(range(100)):
time.sleep(0.05)

pyyaml

示例一:

1
2
3
4
5
6
import yaml

if __name__ == "__main__":
with open("./config.yaml") as config:
cfg = yaml.safe_load(config)
print(cfg["mqtt"])

示例二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import yaml
def read_yaml(yaml_path: Union[str, Path]) -> Dict:
if not Path(yaml_path).exists():
raise FileExistsError(f'The {yaml_path} does not exist.')

with open(str(yaml_path), 'rb') as f:
data = yaml.load(f, Loader=yaml.Loader)
return data

xx.yaml:
A:
x: 1
y: 2

B:
x: 1
y: 2

生成requirements

1
2
pip install pipreqs
pipreqs . --encoding=utf8 --force

打包

安装anaconda,打开工作台

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#创建环境,3.9版本win7可能不支持
conda create -n test python=3.7
#进入环境
conda activate test
#切换32位
conda config --env --set subdir win-32
#或者
set CONDA_FORCE_32BIT=1 切回set CONDA_FORCE_32BIT=0
#查看
conda info
#安装依赖
pip install -r requirements.txt
#安装打包软件
pip install pyinstaller
#打包
pyinstaller -F .\main.py --noconsole

对配置文件进行打包

main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
import sys
import os

# 获取打包后可执行文件所在的目录
base_dir = getattr(sys, '_MEIPASS', os.path.dirname(os.path.abspath(__file__)))

# 使用绝对路径来访问文件
config_path = os.path.join(base_dir, 'config.yaml')

with open(config_path, 'rb') as f:
# 处理config.yaml文件
pass

打包命令

1
pyinstaller -F --add-data "config.yaml;." .\decrypt_chrome_passwords_obfx.py --noconsole

时间

获取今天是星期几

1
2
3
4
from datetime import datetime
week_list = ["星期一","星期二","星期三","星期四","星期五","星期六","星期日"]
dayOfWeek = datetime.now().weekday()
print(week_list[dayOfWeek])

获取年月日时分秒

1
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

年月日转时间戳

1
2
3
4
5
6
7
8
9
import time
dt = "2016-05-05 20:28:54"
#转换成时间数组
timeArray = time.strptime(dt, "%Y-%m-%d %H:%M:%S")
#转换成时间戳
timestamp = time.mktime(timeArray)
print(type(timestamp))
print(timestamp)
print(int(timestamp))

计算代码运行时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time

start_time = time.perf_counter()

# 执行要测试的代码块
for i in range(1000000):
pass

end_time = time.perf_counter()

# 计算代码块的执行时间
elapsed_time = end_time - start_time

print(f"代码执行时间为 {elapsed_time:.6f} 秒")

文件

打开文件夹窗口

1
2
3
import os
path = r'D:\code\ggggg\spider-animate\dist'
os.startfile(path)

创建文件夹

1
2
3
4
5
6
7
8
9
10
11
12
import os
path = "./file2"
isExists = os.path.exists(path)
# 判断结果
if not isExists:
# 如果不存在则创建目录
# 创建目录操作函数
os.makedirs(path)
print(path + ' 创建成功')
else:
# 如果目录存在则不创建,并提示目录已存在
print(path + ' 目录已存在')

判断文件是否存在

1
2
3
4
5
6
7
8
# 判断的是文件
import os
if(os.path.isfile('test.1') ==False):
print("文件不存在")
# 判断的是文件夹
os.path.isdir(path1)
# 两者均可
os.path.exists(path1)

循环文件夹,判断路径文件是否存在

1
2
3
for subfolder in tqdm(os.listdir(self.iDataPath)):
if not os.path.isdir(os.path.join(self.oDataPath, subfolder)):
os.mkdir(os.path.join(self.oDataPath, subfolder))

文件夹下所有文件

1
2
files= os.listdir(path) #得到文件夹下的所有文件名称
for file in files: #遍历文件夹

递归遍历

1
2
3
4
5
6
7
8
def getallfilesofwalk(dir):
if not os.path.isdir(dir):
print(dir)
return
dirlist = os.walk(dir)
for root, dirs, files in dirlist:
for file in files:
print(os.path.join(root, file))

文件操作

复制

1
shutil.copytree(os.path.join(lib,group),os.path.join(targetLibPath,group))

删除

1
os.unlink("C:\\b\\1.txt")

删除文件夹

1
2
3
4
try:
os.rmdir("C:\\b\\new_a")
except Exception as ex:
print("错误信息:"+str(ex)) # 提示:错误信息,目录不是空的

删除文件夹及其内容

1
shutil.rmtree("C:\\b\\new_a")

移动文件

1
shutil.move("C:\\a\\1.txt", "C:\\b")

移动文件夹

1
shutil.move("C:\\a\\c", "C:\\b")

重命名文件

1
shutil.move("C:\\a\\2.txt", "C:\\a\\new2.txt")

重命名文件夹

1
shutil.move("C:\\a\\d","C:\\a\\new_d")

打开文件写入

打开

1
2
3
4
5
6
7
8
9
10
#open(file, mode='r', encoding)
# file 要操作的文件名字, 类型是str
# mode, 文件打开方式,只读打开r(read)、只写打开w(write)、追加打开a(append)
# encoding, 文件的编码格式, 常见的编码格式有两种,一种是gbk, 一种是utf-8
# 返回操作句柄
f = open('1.txt', 'r')
f = open('1.txt', 'a', encoding='utf-8')
#不需要关闭
with open('1.txt', "wb") as code:
xxxx

写入

1
f.write('hello world\n')

读出

1
buf = f.read()

逐行读取

1
2
for line in open("foo.txt"): 
print line,

读写JSON

1
2
3
4
5
6
7
8
9
10
11
from json import load, dump

def settinRead():
with open(projectPath + '/config/animate.json',encoding= 'utf-8') as file:
data = load(file)
return data


def settinSet(data):
with open(projectPath + '/config/animate.json', 'w',encoding='utf-8') as file:
dump(data, file,ensure_ascii=False,indent = 4)

图库

新增

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 存图入库,传入分组、名称、下载链接
# 图库格式
# imgLib
# - group1
# - name1
# - 1.jpg
# - 2.jpg
# - name2
# - 1.jpg
def inputImgLib(group,name,url):
downLoadPath = "./download/"
imglibPath = "./imgLib/"
if not os.path.isdir(downLoadPath):
os.mkdir(downLoadPath)
if not os.path.isdir(imglibPath):
os.mkdir(imglibPath)

#首先检测url格式
if( (url.endswith(".jpg")==False and url.endswith(".png")==False)):
print("url后缀错误")
return False
#查询缓存区是否已经下载过此图片
#零时缓冲区本不应该发生此情况,但是在多线程执行,且存在会执行保存相同url链接时
fileName = url[ url.rindex( '/' ) + 1 : len( url ) ]
if(os.path.isfile(downLoadPath + fileName)):
print("文件已经被下载")
return False
#图片缓存本地,命名方式为url
if(download(url, downLoadPath, fileName) == False):
return False

#首先判断组文件夹是否存在
groupPath = imglibPath + group
if not os.path.isdir(groupPath):
os.mkdir(groupPath)
print("新建分组{}".format(groupPath))

#判断对象文件夹是否存在
classPath = groupPath + "/" + name
if not os.path.isdir(classPath):
os.mkdir(classPath)
print("新建分类{}".format(classPath))

files= os.listdir(classPath) #得到文件夹下的所有文件名称
num = 0
for file in files: #遍历文件夹
num = num +1

shutil.move(downLoadPath + fileName, classPath + "/" + str(num + 1) + ".jpg")
print("添加图片{},{}".format(classPath,num +1))
return True

读取和存储list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def listSaveOrRead(path,mode="read",arg=None):
if(mode == "save"):
if(arg == None):
return False
file = open(path, 'w')
file.write(str(arg))
file.close()
return True
elif(mode == "read"):
file = open(path,'r')
rdlist = eval(file.read())
file.close()
return rdlist

test = ['fff',1,1.1]
listSaveOrRead(path = "facelib.txt",mode="save",arg = test)
readText = listSaveOrRead(path='facelib.txt')
print(readText)
print(type(readText))

执行时传入参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import argparse
def main(args):
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('-m','--model', required=True ,choices=['find', 'update'])
parser.add_argument('-i','--img', default='',help="input face image path")
parser.add_argument('-o','--outImg', default='',help="output retinaface image path")
args = parser.parse_args()

if(args.model == "input"):
imgPath = args.img
imgName = args.imgName

if __name__ == "__main__":
main(sys.argv[1:])

异常

1
2
3
4
try:
pass
except Exception as e:
print(e)

用MD5判断重复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 获取MD5值        
def getmd5(file):
if not os.path.isfile(file):
return
fd = open(file,'rb')
md5 = hashlib.md5()
md5.update(fd.read())
fd.close()
return md5.hexdigest()


# 利用HASH值来判断是否重复
def compare_images(path_one, path_two):
a = getmd5(path_one)
b = getmd5(path_two)
if(a != b):
return True
else:
return False

防止重复打开

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def winOpen(sys):
app= QtWidgets.QApplication(sys.argv)
#防止多开
lockFile = QLockFile("./webprint.app.lock")

if lockFile.tryLock(2000):
base = os.path.dirname(os.path.realpath(sys.argv[0]))
file = open(base + '/assets/qss/style.qss',"r", encoding="utf-8")
qss = file.read().replace("$DataPath",".")
app.setStyleSheet(qss)
# 关闭全部窗口后程序不退出
app.setQuitOnLastWindowClosed(False)
# 界面
win = wincore()
win.show()
sys.exit(app.exec_())
else:
#messagebox.showinfo("重复开启","请不要重复开启本软件,注意托盘区域")
#sys.exit(-1)
msg_box = QMessageBox()
msg_box.setWindowTitle("重复开启")
msg_box.setText("请不要重复开启本软件,注意托盘区域")
msg_box.setIcon(QMessageBox.Information)
msg_box.addButton("确定", QMessageBox.YesRole)
msg_box.exec()
sys.exit(-1)

通过保存进程的识别信息进行区分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def checkMultiOpen(self):
'''检查多开'''
def isMultiOpen():
'''主进程多开时返回T'''
def getProcessKey(pid):
# 区分不同时间和空间上同一个进程的识别信息
try:
return str(psutil.Process(pid).create_time())
except psutil.NoSuchProcess as e: # 虽然psutil.pid_exists验证pid存在,但 Process 无法生成对象
return ''
# 检查上次记录的pid和key是否还在运行
lastPID = self.get('processID')
lastKey = self.get('processKey')
if psutil.pid_exists(lastPID): # 上次记录的pid如今存在
runningKey = getProcessKey(lastPID)
if lastKey == runningKey: # 上次记录的key与它pid当前key对应,则证实多开
print("多开")
return

# 本次为唯一的进程,记录当前进程信息
nowPid = os.getpid()
nowKey = getProcessKey(nowPid)
self.set('processID', nowPid)
self.set('processKey', nowKey, isSave=True)

UI

pyqt

将UI文件转为py,调用界面文件

转化

1
pyuic5 -o gui.py gui.ui

示例一

如果是

1
2
class Ui_MainWindow(object):
def setupUi(self, MainWindow):

则使用下列方式打开

1
2
3
4
5
6
7
8
9
10
11
class MyMainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.ui = Ui_MainWindow()
self.ui.setupUi(self)

if __name__ == '__main__':
app = QApplication(sys.argv)
window = MyMainWindow()
window.show()
sys.exit(app.exec_())

示例二

如果是:

1
2
class Ui_Form(object):
def setupUi(self, Form):

则使用下列方式打开

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import sys
from PyQt5.QtWidgets import QApplication, QWidget
from autoUpdateUi import Ui_Form

class MyApp(QWidget):
def __init__(self):
super().__init__()
self.ui = Ui_Form()
self.ui.setupUi(self)

if __name__ == '__main__':
app = QApplication(sys.argv)
myapp = MyApp()
myapp.show()
sys.exit(app.exec_())

底部消息栏

1
2
3
self.statusBar=QStatusBar()
self.setStatusBar(self.statusBar)
self.statusBar.showMessage('lalallala-------',5000)

设置窗体

名称

1
self.setWindowTitle('123')

禁止修改大小

1
self.setFixedSize(self.width(), self.height());

绑定和设置

普通按键

1
self.pushButton.clicked.connect(self.buttonStop)

菜单栏按键

1
self.action_4.triggered.connect(self.openConfig)

选择框

1
self.comboBox.currentIndexChanged.connect(self.comboxChange_event)

按键使能失能

1
2
self.pushButton.setEnabled(False)
self.pushButton.setEnabled(True)

定时器

1
2
3
self.timer3 = QTimer()
self.timer3.timeout.connect(self.updateStatus)
self.timer3.start(1000)

文本框

设置文本

1
self.label.setText("ID: "+self.userId)

设置图片

1
2
from PyQt5.QtGui import QPixmap
label3.setPixmap(QPixmap("1.png"))

获取文本

1
2
3
theme = self.lineEdit_theme.text()
cb_txt = self.comboBox.currentText()

文本框,显示并且跳到文末

1
2
self.textBrowser.insertPlainText( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())+" => "+ txt + "\n")
self.textBrowser.moveCursor(self.textBrowser.textCursor().End)

清空文本框

1
2
self.textBrowser.clear()
self.textBrowser.setText("")

文字颜色和大小

1
self.textBrowser.append("<font color='red' size=36>  123123 <font>")

限制输入

1
2
from PyQt5.QtCore import QRegExp
from PyQt5.QtGui import QRegExpValidator, QIntValidator, QDoubleValidator
1
2
3
4
5
6
7
8
9
10
11
12
#设置浮点数限制 范围0~86400,小数点1
LE2Validator = QDoubleValidator(self)
LE2Validator.setRange(0, 86400)
LE2Validator.setNotation(QDoubleValidator.StandardNotation)
LE2Validator.setDecimals(1)
self.lineEdit_2.setValidator(LE2Validator)

# 限制为数字和字符
reg = QRegExp("[a-zA-Z0-9]+$")
LE1Validator = QRegExpValidator(self)
LE1Validator.setRegExp(reg)
self.lineEdit.setValidator(LE1Validator)

右键菜单

1
2
3
4
5
6
7
8
9
10
11
self.textBrowser.setContextMenuPolicy(Qt.CustomContextMenu)
self.textBrowser.customContextMenuRequested.connect(self.showMenu)

self.contextMenu = QMenu(self)
self.clearBrowser = self.contextMenu.addAction('清空显示')、
self.clearBrowser.triggered.connect(self.Event)

def showMenu(self,pos):
self.contextMenu.exec_(QCursor.pos())
def Event(self):
QMessageBox.information(self, "提示:", ' 您选择了' + self.sender().text())

子窗口

首先用designer画一个子窗体,文件名为configUi,窗体名为Ui_DialogConfig
引入

1
from configUi import Ui_DialogConfig

窗体的类,和其他窗体是一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ConfigWin(QtWidgets.QDialog, Ui_DialogConfig):
def __init__(self):
super(ConfigWin, self).__init__()
self.setupUi(self)
self.init()
def init(self):
self.setWindowTitle('配置')

self.lineEdit.setText(self.cfg.userId)
self.lineEdit_2.setText(self.cfg.phone)

self.pushButton.clicked.connect(self.ipScan)
def ipScan(self):
pass

在主窗体的一个按键事件中打开此窗体

1
2
3
4
5
6
7
8
9
10
11
12
def setConfig(self):
cfgWin = ConfigWin()
cfgWin.show()
def quit():
print("quit")
cfgWin.close()
def save():
print("save")
cfgWin.close()
cfgWin.pushButton_5.clicked.connect(quit)
cfgWin.pushButton_4.clicked.connect(save)
cfgWin.exec_()

弹窗

1
2
3
4
5
6
7
8
9
10
QMessageBox.information(self,"标题","消息正文",QMessageBox.Yes|QMessageBox.No,QMessageBox.Yes)

QMessageBox.question(self,"标题","问答消息正文",QMessageBox.Yes|QMessageBox.No,QMessageBox.Yes)

QMessageBox.warning(self,"标题","警告消息正文",QMessageBox.Yes|QMessageBox.No,QMessageBox.Yes)

QMessageBox.critical(self,"标题","严重错误消息正文",QMessageBox.Yes|QMessageBox.No,QMessageBox.Yes)

QMessageBox.about(self,"标题","关于消息正文")

显示图片

1
2
3
4
5
6
pixmap = QPixmap(img)
#自适应大小
self.label_5.setScaledContents(True)
self.label_5.setPixmap(pixmap)
#取消显示
self.label_5.setPixmap(QPixmap(""))

最小化到系统托盘

新增的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#托盘
def setTrayIcon(self):
# 初始化菜单单项
self.quitAppAction = QtWidgets.QAction("退出")

# 菜单单项连接方法
self.quitAppAction.triggered.connect(self.quitApp)

# 初始化菜单列表
self.trayIconMenu = QtWidgets.QMenu()
# self.trayIconMenu.addSeparator()
self.trayIconMenu.addAction(self.quitAppAction)

# 构建菜单UI
self.trayIcon = QtWidgets.QSystemTrayIcon()
self.trayIcon.setContextMenu(self.trayIconMenu)

self.trayIcon.setIcon(QIcon('./logo.png'))
self.trayIcon.setToolTip("云打印客户端软件")
# 左键双击打开主界面
self.trayIcon.activated[QtWidgets.QSystemTrayIcon.ActivationReason].connect(self.openMainWindow)
# 允许托盘菜单显示
self.trayIcon.show()


def quitApp(self):
checkFlag = QtWidgets.QMessageBox.information(self, "退出确认", "是否确认退出?",
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No)
if checkFlag == QtWidgets.QMessageBox.Yes:
#logger.info("点击了退出")
QtWidgets.qApp.quit()
else:
pass

def openMainWindow(self, reason):
if reason == QtWidgets.QSystemTrayIcon.DoubleClick:
#logger.info("恢复窗体显示")
self.showNormal()
self.activateWindow()

def closeEvent(self,event):
#logger.info("点击了窗体上的关闭按钮")
pass

然后在init中调用setTrayIcon

1
self.setTrayIcon()

在主函数中设置 关闭全部窗口后程序不退出

1
2
3
4
5
6
7
8
def winOpen(sys):
app= QtWidgets.QApplication(sys.argv)
# 关闭全部窗口后程序不退出
app.setQuitOnLastWindowClosed(False)

win = wincore()
win.show()
sys.exit(app.exec_())

使用内置图标

1
2
value : 1~71
btn.setIcon(QApplication.style().standardIcon(value))

图像处理

切图

1
2
3
4
5
6
7
8
img = Image.open('01.jpg') 
width, height = img.size
# 前两个坐标点是左上角坐标
# 后两个坐标点是右下角坐标
# width在前, height在后
box = (100, 100, 550, 350)
region = img.crop(box)
region.save('crop.jpg')

拼接

多张图拼接为一张图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import PIL.Image as Image
import os

IMAGES_PATH = 'E:\\数据集\\手写数字\\penbox\\2-6-Q\\' # 图片集地址
IMAGES_FORMAT = ['.jpg', '.JPG','.png'] # 图片格式
IMAGE_SIZE = 30 # 每张小图片的大小
IMAGE_ROW = 3 # 图片间隔,也就是合并成一张图后,一共有几行
IMAGE_COLUMN = 10 # 图片间隔,也就是合并成一张图后,一共有几列
IMAGE_SAVE_PATH = 'E:\\数据集\\手写数字\\penbox\\2-6-Q.jpg' # 图片转换后的地址

image_names = [name for name in os.listdir(IMAGES_PATH) for item in IMAGES_FORMAT if
os.path.splitext(name)[1] == item]

print("{} {}".format(len(image_names), IMAGE_ROW * IMAGE_COLUMN))
# 简单的对于参数的设定和实际图片集的大小进行数量判断
if len(image_names) != IMAGE_ROW * IMAGE_COLUMN:
print("{} {}".format(len(image_names), IMAGE_ROW * IMAGE_COLUMN))
raise ValueError("合成图片的参数和要求的数量不能匹配!{}")

# 定义图像拼接函数
def image_compose():
to_image = Image.new('RGB', (IMAGE_COLUMN * IMAGE_SIZE, IMAGE_ROW * IMAGE_SIZE)) #创建一个新图
# 循环遍历,把每张图片按顺序粘贴到对应位置上
for y in range(1, IMAGE_ROW + 1):
for x in range(1, IMAGE_COLUMN + 1):
from_image = Image.open(IMAGES_PATH + image_names[IMAGE_COLUMN * (y - 1) + x - 1]).resize(
(IMAGE_SIZE, IMAGE_SIZE),Image.ANTIALIAS)
to_image.paste(from_image, ((x - 1) * IMAGE_SIZE, (y - 1) * IMAGE_SIZE))
return to_image.save(IMAGE_SAVE_PATH) # 保存新图

image_compose() #调用函数

代码格式化

black

仓库
安装及其使用,python版本要大于3.7

1
2
3
pip install black
black {source_file_or_directory}
python -m black {source_file_or_directory}

isort 导入排序

仓库
安装

1
pip install isort

使用

1
2
isort mypythonfile.py mypythonfile2.py
isort .

配置

读写配置 configparser

配置文件

1
2
3
4
5
6
7
[user]
id = 123
phone = 1234567890

[network]
serverhost = 192.168.1.2

读取配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import configparser

def readConfig(self):
try:
logger.info("read config.....")
config = configparser.ConfigParser()
config.read('config.ini')
#获取用户信息
self.userId = config.get('user', 'id')
self.userPhone = config.get('user','phone')
#获取服务器信息
self.serverhost = config.get('network','serverhost')
return True

except:
logger.error("read config error")
return False

保存配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def saveConfig(self):
try:
config = configparser.ConfigParser()
filepath = "config.ini"
config.read(filepath)

config.set('user', 'id',self.lineEdit.text())
config.set('user','phone',self.lineEdit_2.text())

config.set('network','serverhost',self.lineEdit_3.text())

fh = open(filepath,'w')
config.write(fh)
fh.close()
logger.info("save succeed")
self.statusBar.showMessage('配置存储成功',5000)
except:
logger.error("save config error")
self.statusBar.showMessage('配置存储失败',5000)

读写配置 env

.env文件

1
2
3
4
5
HOST=aaa.app
DBNAME=root
PASSWORD=123123
DATABASE=AAAA
PORT = 1234

读取配置

1
2
3
4
5
6
7
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv('.env'))
print(os.environ.get("HOST"))
print(os.environ.get("DBNAME"))
print(os.environ.get("PASSWORD"))
print(os.environ.get("DATABASE"))
print(os.environ.get("PORT"))

配置管理 Hydra

安装:

1
pip install hydra-core

官方文档
示例:

conf/config.yaml

1
2
3
4
db:
driver: mysql
user: omry
pass: secret

Application:

1
2
3
4
5
6
7
8
9
10
11
12
13
import hydra
from omegaconf import DictConfig, OmegaConf

@hydra.main(version_base=None, config_path="conf", config_name="config")
def my_app(cfg : DictConfig) -> None:
print(OmegaConf.to_yaml(cfg))
print(cfg.db.user)
print(cfg['db']['user'])
cfg.db.user = '123'
print(cfg['db'].user)

if __name__ == "__main__":
my_app()

另一种方式:

1
2
3
4
import hydra
hydra.initialize(config_path="conf", job_name="test_app",version_base=None)
cfg = hydra.compose(config_name="config")
print(cfg)

decouple 配置管理

仓库
安装

1
pip install python-decouple

创建配置文件 .env

1
2
3
NAME = fff
FLAG = True
PORT = 1234

然后读取它

1
2
3
4
5
6
7
8
9
10
from decouple import config

name = config('NAME')
flag = config('FLAG', default=False, cast=bool)
host = config('HOST', default='192.168.1.1')
port = config('PORT', default=999, cast=int)

# 使用配置项
print("{} {} || {} {} || {} {} || {} {}".format(name,type(name), flag,type(flag), host,type(host), port,type(port)))

结果:

1
fff <class 'str'> || True <class 'bool'> || 192.168.1.1 <class 'str'> || 1234 <class 'int'>

配置也可以是多项

1
URL=123.com 456.com

然后

1
2
url = config('URL', cast=Csv())
print("{} {}".format(url,type(url)))

Csv()默认是字符串,如果是整数则可以传入参数Csv(int)

可以现在配置的参数

1
2
3
4
5
6
7
8
9
10
from decouple import config, Choices
os.environ['CONNECTION_TYPE'] = 'usb'
config('CONNECTION_TYPE', cast=Choices(['eth', 'usb', 'bluetooth']))
'usb'

os.environ['CONNECTION_TYPE'] = 'serial'
config('CONNECTION_TYPE', cast=Choices(['eth', 'usb', 'bluetooth']))
Traceback (most recent call last):
...
ValueError: Value not in list: 'serial'; valid values are ['eth', 'usb', 'bluetooth']

下载

多线程下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from PyQt5.QtCore import QThread, pyqtSignal
import requests

class DownloaderThread(QThread):
download_completed = pyqtSignal(bool,str,str,dict)

def __init__(self, url,path,name,id,cfg):
super().__init__()
self.url = url
self.downloadPath = path
self.name = name
self.id = id
self.cfg=cfg
self.stop_thread = False

def run(self):
try:
headers = {'content-type': "application/octet-stream",}
response = requests.get(self.url, headers=headers, timeout=10, stream=True)
block_size = 1024
with open(os.path.join(self.downloadPath,self.name) , "wb") as f:
for chunk in response.iter_content(block_size):
if chunk and not self.stop_thread:
f.write(chunk)
else:
break
if not self.stop_thread:
res = True
else:
#os.remove(os.path.join(self.downloadPath, self.name))
res = False
except Exception as e:
res = False
# 下载完成后,通过信号通知主UI
self.download_completed.emit(res,self.name,self.id,self.cfg)

def stop(self):
self.stop_thread = True

调用

1
2
3
self.downloader = DownloaderThread(fileUrl,downloadPath,fileName,taskNo,cfg)
self.downloader.download_completed.connect(self.downloadCallback)
self.downloader.start()

回调

1
2
3
4
5
6
7
8
9
10
def downloadCallback(self,res,name,id,cfg):
if(res == True):
self.showMessage("文件下载完成")
logger.info("文件下载完成")
self.workerFile(name,id,cfg)
else:
self.showMessage("文件下载失败")
logger.error("文件下载失败")
self.netTask.publishTaskStatus("refuse",id,"文件下载失败")
self.startWorkFlag = False

假设你创建了一个名为 my_package 的 Python 包,并在其中添加了两个模块 module1.py 和 module2.py。现在,如果您想在 my_package 中定义一些变量和函数,并从这些模块中导入并使用这些变量和函数,可以使用 init.py 文件。

首先,在 my_package 目录下创建一个名为 init.py 的空文件。然后,您可以在该文件中声明变量和函数,并从其他模块中导入它们。例如:

1
2
3
4
5
6
7
8
9
10
11
12
# my_package/__init__.py

# 声明变量
MY_VALUE = 12345

# 定义函数
def my_function():
pass

# 导入模块中的变量和函数
from .module1 import module1_value, module1_function
from .module2 import module2_value, module2_function

在上面的代码中:

  • 首先声明了一个名为 MY_VALUE 的变量和一个名为 my_function() 的函数。
  • 然后,使用相对导入语句从模块 module1 和 module2 中导入变量和函数。
  • 注意,在此处使用点号 . 表示当前包 my_package。这是因为 init.py 文件是在包 my_package 中创建的,因此它也被视为 my_package 的一部分。
  • 最后,可以通过导入 my_package 来访问这些变量和函数
1
2
3
4
5
6
7
8
9
10
# 在其他文件中导入 my_package
import my_package

# 使用变量和函数
print(my_package.MY_VALUE)
my_package.my_function()
my_package.module1_value
my_package.module1_function()
my_package.module2_value
my_package.module2_function()

这样就可以在 my_package 中组织和重用代码,并且通过 init.py 文件来导入和使用该包中的变量和函数。


python自用笔记
https://blog.kala.love/posts/fc8d02ff/
作者
Lissettecarlr
发布于
2022年7月7日
许可协议