白乐天

道阻且长,行则将至。

Python语言

urllib

parse

quote()

quote_plus()

requests

request()

requests.request() 是一个通用的函数,它接受 HTTP 方法(如 “get”、”post”、”put” 等)和各种可选参数来构造请求,并返回 Response 对象。

1
def request(method, url, **kwargs):

get()

get请求

1
2
3
4
5
6
7
def get(url, params=None, **kwargs):
return request("get", url, params=params, **kwargs)

# url:请求的目标 URL。这个是必需的参数。
# params:可选参数,表示要发送的查询字符串数据。可以是字典、元组列表或字节流。默认值为 None,表示没有查询参数。
# kwargs:其他可选的关键字参数,传递给 requests 库的 request 函数。这些参数可以包括 headers、cookies、timeout 等。
# 返回值:返回一个Response对象。

post()

post请求

1
2
3
4
5
6
7
8
def post(url, data=None, json=None, **kwargs):
return request("post", url, data=data, json=json, **kwargs)

# url:请求的目标 URL,必须提供。
# data:可选参数,用于发送的数据。它可以是一个字典、元组列表、字节流或者文件对象。默认值是 None。
# json:可选参数,如果要发送 JSON 数据,可以传递一个 Python 对象(如字典、列表等),它将会被序列化为 JSON 格式。默认值是 None。
# kwargs:其他的可选参数,这些参数将传递给 requests.request() 函数,包括 headers、timeout、auth 等。
# 返回值:返回一个 Response 对象,它包含了来自服务器的响应数据。

Response对象

属性

content

返回响应内容,以字节为单位

cookies

返回一个CookieJar对象,其中包含从服务器发回的cookie。

encoding

返回用于解码的编码

headers

返回响应头的字典

status_code

返回状态码

text

以unicode形式返回响应的内容

url

返回响应的URL

方法

json()

返回JSON对象结果。

close()

关闭与服务器的连接

多进程multiprocessing

在多进程模型中,操作系统会分配多个进程,每个进程拥有独立的内存空间和资源。

Process类

构造方法

创建子进程

1
2
3
4
5
process = Process(target=None, name=None, args=(), kwargs={})
# target: 指定要在新进程中执行的目标函数。
# name: 设置进程的名称。
# args: 传递给目标函数的参数,以元组的形式提供。
# kwargs: 传递给目标函数的关键字参数,以字典的形式提供。

run() 方法

run() 方法定义了进程执行的任务。通常我们不直接调用 run(),而是通过 start() 来启动进程,start() 方法会在子进程中自动调用 run()。

start() 方法

start() 方法用于启动子进程。调用 start() 后,进程会被创建并在后台运行,接着会自动调用 run() 方法来执行进程中的任务。

join() 方法

join() 方法用于等待子进程完成。它会阻塞主进程,直到被调用的子进程执行完毕。

name 属性

name 属性用于获取或设置进程的名称。

pid 属性

pid 属性用于获取子进程的进程 ID。进程 ID 是操作系统为每个进程分配的唯一标识符。

一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from multiprocessing import Process
import time


def test():
print('test started')
time.sleep(2)
print('test finished')

def testOO(name):
print('test00 started')
time.sleep(2)
print("Name:",name)
print('test00 finished')


if __name__ == '__main__':
p = Process(target=test)
pOO = Process(target=testOO,args=('Bileton',))

p.start()
print("PID:", p.pid)
print("ProcNAME:", p.name)
p.join()

pOO.start()
print("PID:", pOO.pid)
print("ProcNAME:", pOO.name)
pOO.join()

print('Process finished')

>>>
PID: 41268
ProcNAME: Process-1
test started
test finished
PID: 24348
ProcNAME: Process-2
test00 started
Name: Bileton
test00 finished
Process finished

Queue类(队列)

Queue用于在不同进程之间传递数据。
Queue类是基于先进先出(FIFO)原则实现的,允许多个进程在队列中安全地放入和取出数据。

构造方法

创建队列

1
2
queue = Queue(maxsize=0)
# maxsize:指定队列的最大容量。默认值为 0,表示队列大小不限制。

put(item)

将 item 放入队列。

1
2
3
4
queue.put(item, block=True, timeout=None)
# item:要放入队列的对象。
# block:是否阻塞,默认为 True。如果为 False,则如果队列满了,put() 会抛出 Full 异常。
# timeout:如果设置为一个非零的数字,当 block 为 True 时,put() 将在等待指定时间后抛出异常。如果设置为 None(默认),则会无限期阻塞。

get()

从队列中取出一个对象。如果队列为空,默认情况下会阻塞直到队列有数据。

1
2
3
get(block=True, timeout=None)
# block:是否阻塞,默认为 True。如果为 False,且队列为空,会抛出 Empty 异常。
# timeout:如果设置了超时,get() 方法将在超时后抛出 Empty 异常。

empty()

返回队列是否为空。如果队列为空,返回 True;否则返回 False。

full()

返回队列是否已满。如果队列已满,返回 True;否则返回 False。

一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from multiprocessing import Process,Queue
import time

def producer(queue):
for i in range(5):
print(f"Producing {i}")
queue.put(i) # 将数据放入队列
time.sleep(1)

def consumer(queue):
while True:
item = queue.get() # 从队列中取出数据
if item == "STOP":
break
print(f"Consuming {item}")
time.sleep(2)

if __name__ == '__main__':
queue = Queue()
# 创建生产者和消费者进程
process_producer = Process(target=producer, args=(queue,),name='producer')
consumer_process = Process(target=consumer, args=(queue,),name='consumer')

# 启动进程
process_producer.start()
consumer_process.start()

# 等待生产者进程完成
process_producer.join()

# 使用 "STOP" 标志告诉消费者停止
queue.put("STOP")
print("queue put \"STOP\"")
# 等待消费者进程完成
consumer_process.join()
print("All Process Done")

>>>
Producing 0
Consuming 0
Producing 1
Producing 2
Consuming 1
Producing 3
Producing 4
Consuming 2
queue put "STOP"
Consuming 3
Consuming 4
All Process Done

Manage类

Manager类能够创建可以在多个进程之间共享的对象,并提供多进程之间同步和管理共享数据的功能。
通过Manager类可以创建如列表、字典、Namespace、Value、Array 等对象,并允许不同进程对其进行操作。

构造方法

创建Manager对象

1
2
from multiprocessing import Manager
manager = Manager()

Manager.dict()

返回一个共享字典,允许多个进程修改和访问字典的键值对。

Manager.list()

返回一个共享列表,允许多个进程修改和访问列表中的元素。

共享字典和列表示例

使用 Manager 类创建共享字典和列表,并在多个进程中操作它们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from multiprocessing import Process, Manager
import time


def update_dict(shared_dict):
for i in range(5):
shared_dict[i] = i * i
time.sleep(1)


def update_list(shared_list):
for i in range(5):
shared_list.append(i*i)
time.sleep(1)


if __name__ == '__main__':
manager = Manager()

# 创建共享字典和共享列表
shared_dict = manager.dict()
shared_list = manager.list()
# 创建进程
process_dict = Process(target=update_dict, args=(shared_dict,), name='process_dict')
process_list = Process(target=update_list, args=(shared_list,), name='process_list')
# 启动进程
process_list.start()
process_dict.start()
# 等待进程完成
process_list.join()
process_dict.join()

print("Shared dict: ", shared_dict)
print("Shared list: ", shared_list)

Pool

Pool是一个用于进程池管理的类,允许你通过并行化任务来高效地利用多核 CPU,自动管理多个进程的创建和销毁。

构造方法

1
2
3
4
5
pool = Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)
# processes:指定池中进程的数量。如果为 None,则默认使用系统的 CPU 核心数。
# initializer:一个可选的函数,在进程池中的每个工作进程启动时调用一次。
# maxtasksperchild:指定每个进程处理的最大任务数,处理完这些任务后,进程将会被重启。这对于避免内存泄漏或处理长时间运行的进程非常有用。
# context:指定启动新进程时使用的上下文。一般情况下不需要指定。

apply(func, args=(), kwds={})

阻塞式方法,调用目标函数 func,并传递参数 args 和关键字参数 kwds。
该方法会等待任务执行完成,并返回结果。

apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

非阻塞式方法,调用目标函数 func,并传递参数 args 和 kwds。
apply_async 不会等待任务完成,而是立即返回一个 AsyncResult 对象,你可以通过该对象检查任务的状态、等待结果,或者在任务完成时获取回调。
callback:任务完成后调用的函数。
error_callback:任务出现异常时调用的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Pool

def square(x):
return x * x

def on_result(result):
print("Result:", result)

if __name__ == "__main__":
pool = Pool(4)
async_result = pool.apply_async(square, (10,), callback=on_result)
async_result.wait() # 等待任务完成

>>>
Result: 100

map(func, iterable, chunksize=None)

阻塞式方法,将 iterable 中的每个元素传递给目标函数 func。
map 会将任务并行化处理,并返回一个包含每个任务结果的列表。
chunksize:可选,指定将 iterable 切分为的任务块大小。

map_async(func, iterable, chunksize=None, callback=None, error_callback=None)

非阻塞式方法,类似于 map,但返回一个 AsyncResult 对象。
callback:任务完成后调用的函数。
error_callback:任务出错时调用的函数。

starmap(func, iterable, chunksize=None)

starmap()是 map() 方法的扩展,用于将多个参数传递给目标函数进行并行计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Pool

def multiply(x, y):
return x * y

if __name__ == "__main__":
pool = Pool(4) # 创建一个包含4个进程的进程池
numbers = [(1, 2), (2, 3), (3, 4), (4, 5), (5, 6)] # 每个元组包含两个参数

# 使用 starmap 函数并行计算
result = pool.starmap(multiply, numbers)

print(f"Results: {result}")
pool.close() # 关闭进程池,防止接受新任务
pool.join() # 等待所有进程完成

>>>
Results: [2, 6, 12, 20, 30]

close()

关闭进程池,不再接受新任务。
close() 后调用 join() 来等待所有进程完成。

join()

等待进程池中的所有进程完成任务后再退出。
示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Pool
import os

def square(x):
print(f"Process ID: {os.getpid()} processing {x}")
return x * x

if __name__ == "__main__":
pool = Pool(processes=4) # 创建一个包含4个进程的进程池
numbers = [1, 2, 3, 4, 5]

# 使用 map 函数并行计算
result = pool.map(square, numbers)

print(f"Results: {result}")
pool.close() # 关闭进程池,防止接受新任务
pool.join() # 等待所有进程完成

>>>
Process ID: 12345 processing 1
Process ID: 12346 processing 2
Process ID: 12347 processing 3
Process ID: 12348 processing 4
Results: [1, 4, 9, 16, 25]

多线程threading

Thread

构造方法

创建一个线程对象。

1
2
3
4
5
thread = Thread(target=None, name=None, args=(), kwargs=None)
# target:要在线程中调用的目标函数。
# name:线程的名称,用于标识线程。如果未指定,系统会自动分配一个唯一的名称。
# args:传递给目标函数的参数元组
# kwargs:传递给目标函数的关键字参数字典。

start()

start() 用于启动一个线程,让线程开始执行它的目标函数。
调用 start() 后,线程会进入 就绪状态,等待操作系统调度执行。

join()

join() 用于阻塞主线程,直到目标线程完成任务后才继续执行主线程的代码。

Lock

线程竞争问题

如下一部分代码,两个线程同时访问和修改全局变量i,会导致结果数据混乱。
由于线程的执行顺序和调度是由操作系统控制的,不同运行时,两个线程对 i 的操作可能以不同的顺序发生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from threading import Thread

i=0
def test1():
global i
for x in range(10000):
i+=x
i-=x
print("test1:",i)

def test2():
global i
for x in range(10000):
i+=x
i-=x
print("test2:",i)

if __name__ == "__main__":
thd1 = Thread(target=test1)
thd2 = Thread(target=test2)
thd1.start()
thd2.start()
thd1.join()
thd2.join()
print("end")

线程同步机制

为了确保共享资源的访问是线程安全的,可以使用线程锁(Lock)。

构造方法

1
lock = Lock()

acquire()

获取锁。如果锁已被其他线程占用,当前线程会阻塞直到锁释放。

release()

释放锁。释放后,其他等待锁的线程可以继续执行。

locked()

检查锁是否已被某个线程获取,返回 True 或 False。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from threading import Thread,Lock

i=0
lock = Lock()
def test1():
global i
lock.acquire()
for x in range(10000):
i+=x
i-=x
lock.release()
print("test1:",i)

def test2():
global i
lock.acquire()
for x in range(10000):
i+=x
i-=x
lock.release()
print("test2:",i)

if __name__ == "__main__":
thd1 = Thread(target=test1)
thd2 = Thread(target=test2)
thd1.start()
thd2.start()
thd1.join()
thd2.join()
print("end")

>>>
test1: 0
test2: 0
end

使用 with 语句管理锁

with 语句可以自动获取和释放锁,避免忘记调用 release() 导致死锁问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from threading import Thread,Lock

i=0
lock = Lock()
def test1():
global i
with lock:
for x in range(10000):
i+=x
i-=x
print("test1:",i)

def test2():
global i
with lock:
for x in range(10000):
i+=x
i-=x
print("test2:",i)

if __name__ == "__main__":
thd1 = Thread(target=test1)
thd2 = Thread(target=test2)
thd1.start()
thd2.start()
thd1.join()
thd2.join()
print("end")

>>>
test1: 0
test2: 0
end

Timer

定时执行任务

构造函数

1
2
3
4
5
threading.Timer(interval, function, args=None, kwargs=None)
# interval: 延迟的时间(单位:秒)。
# function: 延迟后执行的函数。
# args: 传递给函数的位置参数(可选)。
# kwargs: 传递给函数的关键字参数(可选)。

start()

启动定时器。

join()

阻塞主线程,直到定时器线程完成任务后再继续执行。
示例

1
2
3
4
5
6
7
8
9
10
11
12
import time
from threading import Thread,Lock,Timer

def run(t):
time.sleep(t)
print("wait",t)

if __name__ == "__main__":
timer = Timer(5,run,(5,))
timer.start()
timer.join()
print("end")

ThreadPoolExecutor

构造方法

1
2
3
4
5
6
7
8
9
10
11
concurrent.futures.ThreadPoolExecutor(
max_workers=None,
thread_name_prefix='',
initializer=None,
initargs=()
)

# max_workers: 池中最大线程数量。如果为 None,默认值是系统的 CPU 核心数。
# thread_name_prefix: 创建线程时线程名的前缀。便于调试和日志分析。
# initializer: 每个线程启动时调用的函数。可以用于线程的初始化操作。
# initargs: 传递给 initializer 的参数。如果指定了 initializer,则可以通过此参数为其提供初始参数。

submit()

使用 submit 提交任务

1
2
3
4
5
ThreadPoolExecutor.submit(fn, *args, **kwargs)
# fn: 任务函数(可调用对象),即要执行的任务。
# *args: 传递给 fn 的位置参数。
# **kwargs: 传递给 fn 的关键字参数。
# 返回值: 返回一个 Future 对象。Future 用于表示一个异步执行的操作,并且可以通过它获取任务的返回值或异常。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ThreadPoolExecutor

def task(n):
return f"Task {n} completed"

# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
futures = [executor.submit(task, i) for i in range(5)]

# 获取结果
for future in futures:
print(future.result())
>>>
Task 0 completed
Task 1 completed
Task 2 completed
Task 3 completed
Task 4 completed

future

Future 是提交任务后返回的对象,通常通过 submit() 方法获得。它可以在任务完成之前就开始获取任务的状态或结果,提供了一种同步与异步混合编程的方式。

result()

result(timeout=None)
如果任务已经完成,返回任务的结果。如果任务抛出异常,则会抛出相应的异常。
timeout 参数指定最大等待时间,超时后会抛出 TimeoutError。

exception()

exception(timeout=None)
如果任务抛出了异常,返回异常。如果任务未抛出异常,则返回 None。
timeout 参数指定最大等待时间,超时后会抛出 TimeoutError。

done()

如果任务已经完成(无论成功或失败),返回 True,否则返回 False。

cancel()

尝试取消任务。如果任务已经开始执行,则无法取消,返回 False。如果任务还没有开始执行,则返回 True。

map()

使用 map 提交批量任务

1
2
3
4
5
6
ThreadPoolExecutor.map(func, *iterables, timeout=None, chunksize=1)
# func: 任务函数
# *iterables: 一个或多个可迭代对象(如列表、元组等)。
# timeout: 可选的最大等待时间(秒)。
# chunksize: 可选,指定每次提交给工作线程的任务数量,默认为 1。
# 返回值: 返回一个迭代器,该迭代器可以用于获取各个任务的结果。

示例

1
2
3
4
5
6
7
8
9
10
11
12
from concurrent.futures import ThreadPoolExecutor

def task(n):
return f"Task {n} completed"

# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 使用 map 提交任务
results = executor.map(task, range(5))

# 输出结果
print(list(results)) # 输出: ['Task 0 completed', 'Task 1 completed', 'Task 2 completed', 'Task 3 completed', 'Task 4 completed']

多参数处理

1
2
3
4
5
6
7
8
9
10
11
12
from concurrent.futures import ThreadPoolExecutor

def add(x, y):
return x + y

# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 使用 map 提交多个可迭代对象
results = executor.map(add, [1, 2, 3], [4, 5, 6])

# 输出结果
print(list(results)) # 输出: [5, 7, 9]

as_completed()

as_completed() 是 Python 中 concurrent.futures 模块的一个方法,提供了在任务完成时立即获取结果的功能。与 map() 方法不同,as_completed() 按照任务完成的顺序返回结果,而不是提交顺序。

1
2
3
4
concurrent.futures.as_completed(futures, timeout=None)
# futures: 可迭代对象,通常是由 submit() 方法返回的 Future 对象集合。
# timeout: 可选,指定最大等待时间(秒)。如果超出此时间,as_completed() 会抛出TimeoutError,不再等待未完成的任务。
# 返回值:返回一个迭代器,按任务完成的顺序逐个返回任务的结果或异常。每次返回一个 Future 对象,您可以使用 future.result() 获取其结果,或使用 future.exception() 获取任务异常。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from concurrent.futures import ThreadPoolExecutor

def task(n):
return f"Task {n} completed"

# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
futures = [executor.submit(task, i) for i in range(5)]

# 使用 as_completed 按照任务完成顺序获取结果
for future in as_completed(futures):
print(future.result()) # 获取每个任务的结果

>>>
Task 0 completed
Task 1 completed
Task 2 completed
Task 3 completed
Task 4 completed

协程

协程是一种用户态的轻量级线程。

同步与异步

同步

  • 按照顺序依次执行每个任务,当前任务完成后再执行下一个任务。
  • 阻塞:当前任务必须等待前一个任务完成。
  • 适合计算密集型任务,简单的操作流程。
  • 在I/O操作时会造成性能瓶颈,整体效率较低。

异步

  • 任务可以并行执行,等待的任务不会阻塞其他任务。
  • 非阻塞:任务可以在等待时进行其他操作。
  • 适合I/O密集型任务,高并发请求,网络和文件I/O操作。
  • 提高了程序的并发性和效率,尤其在I/O密集型任务中尤为有效。

asyncio

coroutine

协程是用来表示异步任务的函数,在 Python 中是通过 async def 声明的异步函数。它们通常与 await 配合使用,以暂停和恢复任务的执行。

event_loop

事件循环是一个运行异步任务的机制,它负责调度和运行协程。事件循环会从任务队列中取出任务并执行它们,执行期间若遇到 await,会暂停当前任务并转去执行其他任务,直到任务完成。

get_event_loop()

asyncio.get_event_loop(),获取当前的事件循环。

run_until_complete()

用于运行指定的协程直到它完成,并返回协程的结果。

1
2
3
loop.run_until_complete(future)

# future:传入的 future 对象通常是一个协程(Coroutine),run_until_complete 会等待这个协程执行完成,并返回协程的结果。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio

async def task():
print("Task started")
await asyncio.sleep(2) # 模拟耗时操作
print("Task finished")
return "Task result"

# 获取当前线程的事件循环
loop = asyncio.get_event_loop()

# 使用 run_until_complete 来运行协程并等待其完成
result = loop.run_until_complete(task())

print(f"Result: {result}")

# 关闭事件循环
loop.close()

>>>
Task started
Task finished
Result: Task result

await

await用于暂停当前协程的执行,直到某个异步操作(通常是协程、Future 或 Task)完成并返回结果。
await 只能在异步函数(用 async 定义的函数)中使用,它使得 Python 的异步代码更加简洁、易读。

task

任务是通过事件循环调度的协程。通过 asyncio.create_task() 可以创建一个任务,并提交给事件循环运行。

run()

asyncio.run() 是运行协程的简化方法,它负责启动事件循环并执行协程,通常用来启动程序的入口协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def test_sleep(): # 定义异步函数
print("Start sleeping")
await asyncio.sleep(1)
print("End sleeping")

if __name__ == '__main__':
cor = test_sleep()
asyncio.run(cor) # 启动时间循环并运行test_sleep()协程

>>>
Start sleeping
End sleeping

gather()

asyncio.gather() 可以同时执行多个协程,等待它们全部完成后返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio

async def task_A():
print("Task A started")
await asyncio.sleep(2)
print("Task A finished")

async def task_B():
print("Task B started")
await asyncio.sleep(1)
print("Task B finished")

async def main():
# 并发执行两个任务
await asyncio.gather(task_A(), task_B())

if __name__ == '__main__':
cor = main()
asyncio.run(cor)

>>>
Task A started
Task B started
Task B finished
Task A finished

create_task()

通过 asyncio.create_task() 创建任务,任务会自动调度到事件循环中执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio


async def task_A():
print("Task A started")
await asyncio.sleep(2)
print("Task A finished")


async def task_B():
print("Task B started")
await asyncio.sleep(1)
print("Task B finished")


async def main():
# 创建任务
task1 = asyncio.create_task(task_A())
task2 = asyncio.create_task(task_B())

# 等待任务完成
await task1
await task2

if __name__ == '__main__':
cor = main()
asyncio.run(cor)

>>>
Task A started
Task B started
Task B finished
Task A finished

wait()

它用于等待一组协程或任务的完成。当有多个异步任务需要等待时,能够在所有任务完成后再继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
asyncio.wait(
fs,
timeout=None,
return_when=ALL_COMPLETED
)

# fs:一个包含 Future 或 Task 对象的可迭代对象(如列表、集合等)。
# timeout:一个可选的超时时间,单位是秒。如果为 None,则会无限期等待。
# return_when:指定什么时候返回,可以取以下值:
# asyncio.FIRST_COMPLETED:任意一个任务完成时返回。
# asyncio.FIRST_EXCEPTION:任意一个任务异常时返回。
# asyncio.ALL_COMPLETED:所有任务完成时返回。
# 返回值:
# done:一个 Future 或 Task 对象的集合,表示已经完成的任务。
# pending:一个 Future 或 Task 对象的集合,表示尚未完成的任务。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio

async def task_1():
await asyncio.sleep(1)
return "任务 1 完成"

async def task_2():
await asyncio.sleep(2)
return "任务 2 完成"

async def task_3():
await asyncio.sleep(3)
return "任务 3 完成"

async def main():
tasks = [task_1(), task_2(), task_3()]

done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

for task in done:
print(task.result())

# 运行主函数
asyncio.run(main())

>>>
任务 1 完成
任务 2 完成
任务 3 完成

sleep()

asyncio.sleep() 是一个非阻塞的休眠方法,它不会阻塞事件循环,适用于模拟延时操作。

Semaphore()

Semaphore 对象维护一个内部计数器,这个计数器表示当前可用的“许可”。每当一个协程想要访问某个资源时,它需要获取一个许可(即递减计数器)。

1
2
3
import asyncio

sem = asyncio.Semaphore(value)

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio

# 创建一个最大并发数为2的信号量
sem = asyncio.Semaphore(2)

async def limited_task(task_id):
async with sem: # 在执行任务前,先获取信号量
print(f"任务 {task_id} 开始执行")
await asyncio.sleep(2) # 模拟长时间运行的任务
print(f"任务 {task_id} 执行完毕")

async def main():
tasks = [asyncio.create_task(limited_task(i)) for i in range(5)]
await asyncio.gather(*tasks)

# 运行主函数
asyncio.run(main())

>>>
任务 0 开始执行
任务 1 开始执行
任务 0 执行完毕
任务 1 执行完毕
任务 2 开始执行
任务 3 开始执行
任务 2 执行完毕
任务 3 执行完毕
任务 4 开始执行
任务 4 执行完毕

aiohttp

aiohttp专为异步 HTTP 请求设计,支持异步 HTTP 客户端和服务器功能。

get请求

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import aiohttp
import asyncio

# 异步函数,用于发送 GET 请求并获取响应
async def fetch(session, url):
# 使用异步上下文管理器发送 GET 请求
async with session.get(url) as response:
# 等待并返回响应内容
return await response.text()

# 主函数
async def main():
# 创建一个 aiohttp 客户端会话,用于管理连接池
async with aiohttp.ClientSession() as session:
# 调用 fetch 函数,异步获取网页内容
data = await fetch(session, "https://www.httpbin.org/headers")
# 打印获取到的响应内容
print(data)

# 运行事件循环并执行主函数
asyncio.run(main())

>>>
{
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.13 aiohttp/3.11.10",
"X-Amzn-Trace-Id": "Root=1-6754315d-47b2a5f425b27f9c291a1d25"
}
}

传递参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import aiohttp
import asyncio

# 主函数
async def main():
# 创建一个 aiohttp 客户端会话,用于管理连接池
async with aiohttp.ClientSession() as session:
# 定义 GET 请求的参数
params = {
"name" : "bileton" # 参数 name 的值为 'bileton'
}
# 使用异步上下文管理器发送 GET 请求,携带查询参数
async with session.get("https://www.httpbin.org/get", params=params) as response:
# 打印并等待返回的响应内容
print(await response.text()) # 返回响应的文本内容

# 运行事件循环并执行主函数
asyncio.run(main())

>>>
{
"args": {
"name": "bileton"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.13 aiohttp/3.11.10",
"X-Amzn-Trace-Id": "Root=1-675445c7-26e8b0e703e73a524d319146"
},
"origin": "219.156.133.197",
"url": "https://www.httpbin.org/get?name=bileton"
}

post请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import aiohttp
import asyncio


async def fetch(session, url):
# 异步请求
async with session.post(url,data="Bileton") as response:
# 返回响应
return await response.text()

async def main():
async with aiohttp.ClientSession() as session:
data = await fetch(session, "https://www.httpbin.org/post")
print(data)

asyncio.run(main())

>>>
{
"args": {},
"data": "Bileton",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br",
"Content-Length": "7",
"Content-Type": "text/plain; charset=utf-8",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.13 aiohttp/3.11.10",
"X-Amzn-Trace-Id": "Root=1-675439c8-4dc6dcb32d0fe49e1686cd1a"
},
"json": null,
"origin": "219.156.133.197",
"url": "https://www.httpbin.org/post"
}

aiofiles

1
2
3
4
5
6
7
8
9
import aiofiles
import asyncio

async def main():
async with aiofiles.open("filename","r",encoding="utf-8") as f:
data = await f.readlines()
print(data)

asyncio.run(main())