跳转至主要内容
Version: develop

使用 Taichi 加速 Python

Taichi 是一个嵌入在 Python 中的领域特定语言(DSL)。 Taichi 的主要功能之一是加速计算密集的 Python 程序,帮助这些程序 实现可以媲美 C/C++ 甚至 CUDA 的性能。 这使得 Taichi 在科学计算领域处于更有利的地位。

在下面的章节中,我们提供两个示例让你体验 Taichi 给 Python 程序带来的加速。

统计质数个数

Python 的大型 for 循环或嵌套 for 循环总是导致运行时性能不佳。 下面的 demo 计算在指定范围内的质数,并用到了嵌套 for 循环(完整版本见 此处)。 只需导入 Taichi 或切换到 Taichi 的 GPU 后端,就能看到整体性能的大幅提升:

"""Count the prime numbers in the range [1, n]
"""

# Checks if a positive integer is a prime number
def is_prime(n: int):
result = True
# Traverses the range between 2 and sqrt(n)
# - Returns False if n can be divided by one of them;
# - otherwise, returns True
for k in range(2, int(n ** 0.5) + 1):
if n % k == 0:
result = False
break
return result

# Traverses the range between 2 and n
# Counts the primes according to the return of is_prime()
def count_primes(n: int) -> int:
count = 0
for k in range(2, n):
if is_prime(k):
count += 1

return count

print(count_primes(1000000))
  1. 将代码保存为 count_prime.py 并在终端中运行以下命令:
time python count_primes.py

屏幕上将显示质数的个数和程序执行时间。 运行该程序需要 2.235 秒。

78498

real 0m2.235s
user 0m2.235s
sys 0m0.000s
  1. 现在,我们对代码稍作修改:导入 Taichi 到你的 Python 代码并在 CPU 后端初始化:
import taichi as ti
ti.init(arch=ti.cpu)
  1. @ti.func 装饰 is_prime(),并用 @ti.kernel 装饰 count_primes()
  • Taichi 的编译器将 @ti.kernel 装饰的 Python 代码编译到不同的设备上,例如 CPU 和 GPU,以实现高性能计算。
  • 请参阅 kernel 与函数 以详细了解 Taichi 的核心概念:kernel 与 Taichi 函数。
@ti.func
def is_prime(n: int):
result = True
for k in range(2, int(n ** 0.5) + 1):
if n % k == 0:
result = False
break
return result

@ti.kernel
def count_primes(n: int) -> int:
count = 0
for k in range(2, n):
if is_prime(k):
count += 1

return count
  1. 再次运行 count_primes.py
time python count_primes.py

计算速度是原先的 6 倍(2.235/0.363)。

78498

real 0m0.363s
user 0m0.546s
sys 0m0.179s
  1. N 乘十到 10,000,000 并再次运行 count_primes.py

    Taichi 的计算时间是 0.8 秒,而单独使用 Python 的计算时间是 55 秒。 Taichi 的计算速度是 Python 的 70 倍。

  2. 将 Taichi 的后端从 CPU 更改为 GPU 并重新运行:

ti.init(arch=ti.gpu)

Taichi 的计算时间是 0.45 秒,而单独使用 Python 的计算时间是 55 秒。 Taichi 进一步提速 120 倍。

动态规划:最长公共子序列

动态规划背后的核心理念是牺牲一些存储空间以减少执行时间,并储存中间结果以避免重复计算。 在下面一节中,我们将完整演示动态规划的实现,这是 Taichi 能够显著“加速”的另一个领域。

下面的例子运用动态规划的理念,求出两个给定序列的最长共同公共子序列(LCS)的长度。 何为 LCS?如现有两序列:a = [0, 1, 0, 2, 4, 3, 1, 2, 1],b = [4, 0, 1, 4, 5, 3, 1, 2],则两序列的 LCS 是 [0, 1, 4, 3, 1, 2],其长度为 6。 让我们正式开始:

  1. 导入 NumPy 和 Taichi 到 Python 程序中:
import taichi as ti
import numpy as np
  1. 初始化 Taichi:
ti.init(arch=ti.cpu)
  1. 创建两个长度同为 15,000 的 NumPy 数组,其中元素为 [0, 100] 范围内的随机整数,并比较:
N = 15000
a_numpy = np.random.randint(0, 100, N, dtype=np.int32)
b_numpy = np.random.randint(0, 100, N, dtype=np.int32)
  1. 这里,我们定义一个 N×NTaichi field f,用它的第 [i, j] 个元素代表序列 ai 个元素和序列 bj 个元素的 LCS 长度。
f = ti.field(dtype=ti.i32, shape=(N + 1, N + 1))
  1. 现在,我们将动态规划问题转变为了对 field f 的遍历问题,其中对序列 ab 进行了比较:
f[i, j] = max(f[i - 1, j - 1] + (a[i - 1] == b[j - 1]),
max(f[i - 1, j], f[i, j - 1]))
  1. 定义一个 kernel 函数 compute_lcs(),传入两序列,得到 LCS 的长度。
@ti.kernel
def compute_lcs(a: ti.types.ndarray(), b: ti.types.ndarray()) -> ti.i32:
len_a, len_b = a.shape[0], b.shape[0]

ti.loop_config(serialize=True) # Disable auto-parallelism in Taichi
for i in range(1, len_a + 1):
for j in range(1, len_b + 1):
f[i, j] = max(f[i - 1, j - 1] + (a[i - 1] == b[j - 1]),
max(f[i - 1, j], f[i, j - 1]))

return f[len_a, len_b]
  • Taichi 将 NumPy 数组储存为 ndarray。
  • 确保设置 ti.loop_config(serialize=True) 以禁用 Taichi 的自动并行功能。 此处的迭代不应并行进行,因为每个循环的迭代计算都取决于前一个循环的迭代结果。
  1. 打印 compute_lcs(a_numpy, b_numpy) 的结果。 现在可得如下程序:
import taichi as ti
import numpy as np

ti.init(arch=ti.cpu)

benchmark = True

N = 15000

f = ti.field(dtype=ti.i32, shape=(N + 1, N + 1))

if benchmark:
a_numpy = np.random.randint(0, 100, N, dtype=np.int32)
b_numpy = np.random.randint(0, 100, N, dtype=np.int32)
else:
a_numpy = np.array([0, 1, 0, 2, 4, 3, 1, 2, 1], dtype=np.int32)
b_numpy = np.array([4, 0, 1, 4, 5, 3, 1, 2], dtype=np.int32)

@ti.kernel
def compute_lcs(a: ti.types.ndarray(), b: ti.types.ndarray()) -> ti.i32:
len_a, len_b = a.shape[0], b.shape[0]

ti.loop_config(serialize=True) # Disable auto-parallelism in Taichi
for i in range(1, len_a + 1):
for j in range(1, len_b + 1):
f[i, j] = max(f[i - 1, j - 1] + (a[i - 1] == b[j - 1]),
max(f[i - 1, j], f[i, j - 1]))

return f[len_a, len_b]


print(compute_lcs(a_numpy, b_numpy))
  1. 将以上代码保存为 lcs.py,并运行:
time python lcs.py

系统打印出 LCS 的长度以及程序执行时间。

2721

real 0m1.409s
user 0m1.112s
sys 0m0.549s

此仓库 提供了此动态规划算法分别在 Taichi 和 NumPy 中的实现。

  • 只使用 Python,需要 476 秒才能得到两个长度均为 15,000 的随机序列的 LCS 长度。
  • 使用 Taichi,只需要大约 0.9 秒,较 Python 快高达 500 倍!
note

实际执行时间可能根据你使用的机器有所变化,但我们相信 Taichi 带来的性能提升幅度是与本文相似的。