跳转至主要内容
Version: v1.6.0

使用 Taichi 加速 Python

Taichi 是一个嵌入在 Python 中的领域特定语言(DSL)。 Taichi 的主要功能之一是加速计算密集的 Python 程序,帮助这些程序 实现可以媲美 C/C++ 甚至 CUDA 的性能。 这使得 Taichi 在科学计算领域处于更有利的地位。

在下面的章节中,我们提供两个示例让你体验 Taichi 给 Python 程序带来的加速。

统计质数个数

Python 的大型 for 循环或嵌套 for 循环总是导致运行时性能不佳。 下面的 demo 计算在指定范围内的质数,并用到了嵌套 for 循环(完整版本见 此处)。 只需导入 Taichi 或切换到 Taichi 的 GPU 后端,就能看到整体性能的大幅提升:

"""Count the prime numbers in the range [1, n]
"""

# Checks if a positive integer is a prime number
def is_prime(n: int):
result = True
# Traverses the range between 2 and sqrt(n)
# - Returns False if n can be divided by one of them;
# - otherwise, returns True
for k in range(2, int(n ** 0.5) + 1):
if n % k == 0:
result = False
break
return result

# Traverses the range between 2 and n
# Counts the primes according to the return of is_prime()
def count_primes(n: int) -> int:
count = 0
for k in range(2, n):
if is_prime(k):
count += 1

return count

print(count_primes(1000000))
  1. 将代码保存为 count_prime.py 并在终端中运行以下命令:
time python count_primes.py

屏幕上将显示质数的个数和程序执行时间。 运行该程序需要 2.235 秒。

78498

real 0m2.235s
user 0m2.235s
sys 0m0.000s
  1. 现在,我们对代码稍作修改:导入 Taichi 到你的 Python 代码并在 CPU 后端初始化:
import taichi as ti
ti.init(arch=ti.cpu)
  1. @ti.func 装饰 is_prime(),并用 @ti.kernel 装饰 count_primes()
  • Taichi 的编译器将 @ti.kernel 装饰的 Python 代码编译到不同的设备上,例如 CPU 和 GPU,以实现高性能计算。
  • 请参阅 kernel 与函数 以详细了解 Taichi 的核心概念:kernel 与 Taichi 函数。
@ti.func
def is_prime(n: int):
result = True
for k in range(2, int(n ** 0.5) + 1):
if n % k == 0:
result = False
break
return result

@ti.kernel
def count_primes(n: int) -> int:
count = 0
for k in range(2, n):
if is_prime(k):
count += 1

return count
  1. 再次运行 count_primes.py
time python count_primes.py

计算速度是原先的 6 倍(2.235/0.363)。

78498

real 0m0.363s
user 0m0.546s
sys 0m0.179s
  1. N 乘十到 10,000,000 并再次运行 count_primes.py

    Taichi 的计算时间是 0.8 秒,而单独使用 Python 的计算时间是 55 秒。 Taichi 的计算速度是 Python 的 70 倍。

  2. 将 Taichi 的后端从 CPU 更改为 GPU 并重新运行:

ti.init(arch=ti.gpu)

Taichi 的计算时间是 0.45 秒,而单独使用 Python 的计算时间是 55 秒。 Taichi 进一步提速 120 倍。

动态规划:最长公共子序列

动态规划背后的核心理念是牺牲一些存储空间以减少执行时间,并储存中间结果以避免重复计算。 在下面一节中,我们将完整演示动态规划的实现,这是 Taichi 能够显著“加速”的另一个领域。

下面的例子运用动态规划的理念,求出两个给定序列的最长共同公共子序列(LCS)的长度。 何为 LCS?如现有两序列:a = [0, 1, 0, 2, 4, 3, 1, 2, 1],b = [4, 0, 1, 4, 5, 3, 1, 2],则两序列的 LCS 是 [0, 1, 4, 3, 1, 2],其长度为 6。 让我们正式开始:

  1. 导入 NumPy 和 Taichi 到 Python 程序中:
import taichi as ti
import numpy as np
  1. 初始化 Taichi:
ti.init(arch=ti.cpu)
  1. 创建两个长度同为 15,000 的 NumPy 数组,其中元素为 [0, 100] 范围内的随机整数,并比较:
N = 15000
a_numpy = np.random.randint(0, 100, N, dtype=np.int32)
b_numpy = np.random.randint(0, 100, N, dtype=np.int32)
  1. 这里,我们定义一个 N×NTaichi field f,用它的第 [i, j] 个元素代表序列 ai 个元素和序列 bj 个元素的 LCS 长度。
f = ti.field(dtype=ti.i32, shape=(N + 1, N + 1))
  1. 现在,我们将动态规划问题转变为了对 field f 的遍历问题,其中对序列 ab 进行了比较:
f[i, j] = ti.max(f[i - 1, j - 1] + (a[i - 1] == b[j - 1]),
ti.max(f[i - 1, j], f[i, j - 1]))
  1. 定义一个 kernel 函数 compute_lcs(),传入两序列,得到 LCS 的长度。
@ti.kernel
def compute_lcs(a: ti.types.ndarray(), b: ti.types.ndarray()) -> ti.i32:
len_a, len_b = a.shape[0], b.shape[0]

ti.loop_config(serialize=True) # Disable auto-parallelism in Taichi
for i in range(1, len_a + 1):
for j in range(1, len_b + 1):
f[i, j] = ti.max(f[i - 1, j - 1] + (a[i - 1] == b[j - 1]),
ti.max(f[i - 1, j], f[i, j - 1]))

return f[len_a, len_b]
  • Taichi 将 NumPy 数组储存为 ndarray。
  • 确保设置 ti.loop_config(serialize=True) 以禁用 Taichi 的自动并行功能。 此处的迭代不应并行进行,因为每个循环的迭代计算都取决于前一个循环的迭代结果。
  1. 打印 compute_lcs(a_numpy, b_numpy) 的结果。 现在可得如下程序:
import taichi as ti
import numpy as np

ti.init(arch=ti.cpu)

benchmark = True

N = 15000

f = ti.field(dtype=ti.i32, shape=(N + 1, N + 1))

if benchmark:
a_numpy = np.random.randint(0, 100, N, dtype=np.int32)
b_numpy = np.random.randint(0, 100, N, dtype=np.int32)
else:
a_numpy = np.array([0, 1, 0, 2, 4, 3, 1, 2, 1], dtype=np.int32)
b_numpy = np.array([4, 0, 1, 4, 5, 3, 1, 2], dtype=np.int32)

@ti.kernel
def compute_lcs(a: ti.types.ndarray(), b: ti.types.ndarray()) -> ti.i32:
len_a, len_b = a.shape[0], b.shape[0]

ti.loop_config(serialize=True) # Disable auto-parallelism in Taichi
for i in range(1, len_a + 1):
for j in range(1, len_b + 1):
f[i, j] = ti.max(f[i - 1, j - 1] + (a[i - 1] == b[j - 1]),
ti.max(f[i - 1, j], f[i, j - 1]))

return f[len_a, len_b]


print(compute_lcs(a_numpy, b_numpy))
  1. 将以上代码保存为 lcs.py,并运行:
time python lcs.py

系统打印出 LCS 的长度以及程序执行时间。

2721

real 0m1.409s
user 0m1.112s
sys 0m0.549s

此仓库 提供了此动态规划算法分别在 Taichi 和 NumPy 中的实现。

  • 只使用 Python,需要 476 秒才能得到两个长度均为 15,000 的随机序列的 LCS 长度。
  • 使用 Taichi,只需要大约 0.9 秒,较 Python 快高达 500 倍!
note

The actual execution time may vary depending on your machine, but we believe that the performance improvements you will see is comparable to ours.