并行计算框架
概述
GTB 文件以变异位点作为最小储存单位,同时它包含着染色体层级信息以及 GTB 文件的自然分块(按照块大小或限制的最大数据量),基于 GTB 格式的并行计算非常容易进行,并具有多种可选的级别。
- 染色体水平并行:开启多个子线程,每个子线程处理其中的一个染色体数据,完成任务后按照染色体拼接数据。适用于需要扫描同个染色体上下游位点的场景(例如 LD 计算)。由于每个染色体包含的位点量不相同,这种并行方式可能导致线程任务量不均等,并且在文件仅包含单个染色体时并行不起作用的问题。
- 块水平并行计算:按照 GTB 的分块方式进行并行计算,每一个(或多个少量的)块分配给一个子线程进行处理,处理完成后通过 CLM 算法输出到文件或使用其他方式保存数据。适用于计算任务仅聚焦于单个位点,并且需要有序、大量输出到文件、不使用额外磁盘空间的情形(例如变异位点查询或样本筛选)。由于 CLM 输出算法需要使用内存暂时保存数据,这种并行方式需要严格控制内存量。
- 变异位点水平并行计算:将整个 GTB 文件按照位点数均等地分为多个子部份,每一个子部份由一个子线程进行处理。适用于计算任务仅聚焦于单个位点的场景,这种并行方式能够满足大部分应用需求。
以上并行方式可以相互组合,例如按照染色体水平执行任务,对每个染色体都使用变异位点水平并行计算。
块水平并行计算通常被用于 GTB 文件的格式转换,例如 GTB 格式转换为 VCF 格式 和 GTB 格式转换为 TSV 格式,由于涉及到较为复杂的内存控制和输出控制,在此处不进行介绍。
染色体水平并行计算
染色体水平并行计算通过主线程分配需要处理的染色体对象,每个子线程依次处理相应的染色体。以下为有序 GTB 文件按照染色体水平并行计算的示例程序:
int nThreads = 6;
// 打开文件对象
GTBManager manager = GTBManager.load("<path_to_GTB>");
// 开启线程池并行计算
ThreadPool threadPool = new ThreadPool(nThreads);
// 获取该文文件包含的染色体对象
BaseArray<Chromosome> chromosomes = new Array<>(manager.getIndexer().getChromosomes());
threadPool.submit(() -> {
// 每个子线程开启一个 GTBReader 实例, 请注意是否需要加载基因型、补充字段
GTBReader reader = manager.instanceReader();
Chromosome chromosome;
while (true) {
synchronized (chromosomes) {
if (chromosomes.size() > 0) {
chromosome = chromosomes.popFirst();
} else {
break;
}
}
// 限制 GTBReader 只能读取指定染色体范围的数据
reader.limit(manager.getIndexer().getVariantIndexRange(chromosome));
reader.seek(0);
// 按行读取位点
Variant variant;
while ((variant = reader.read()) != null) {
// 获取基因型序列
Genotypes genotypes = (Genotypes) variant.getProperty(IGenotypes.class);
// TODO
}
}
reader.close();
return true;
}, nThreads);
threadPool.close();
GTB 文件有序时,GTBIndexer 索引器精确地识别每个染色体的位点的指针范围,这种并行方式不会产生额外的 IO 开销。使用该框架的计算程序实例是 LDCalculator。
当 GTB 文件无序时 manager.getIndexer().getChromosomes()
将抛出异常。一种解决策略是使用 GTBSorter 进行排序,再进行计算流程。另一种解决策略是通过 GTBFilter 限制染色体:
int nThreads = 6;
// 打开文件对象
GTBManager manager = GTBManager.load("<path_to_GTB>");
// 开启线程池并行计算
ThreadPool threadPool = new ThreadPool(getThreads());
// 由于此时不知道文件包含多少染色体, 因此列出所有的可选的染色体
BaseArray<Chromosome> chromosomes = new Array<>(Chromosome.identifiableList());
threadPool.submit(() -> {
// 每个子线程开启一个 GTBReader 实例, 请注意是否需要加载基因型、补充字段
GTBReader reader = manager.instanceReader();
Chromosome chromosome;
GTBFilter filter = new GTBFilter();
while (true) {
synchronized (chromosomes) {
if (chromosomes.size() > 0) {
chromosome = chromosomes.popFirst();
} else {
break;
}
}
// 限制 GTBReader 只能读取指定染色体范围的数据
filter.clear();
filter.filterByChromosome(chromosome);
reader.seek(0);
Variant variant;
while ((variant = reader.read(filter)) != null) {
// 获取基因型序列
Genotypes genotypes = (Genotypes) variant.getProperty(IGenotypes.class);
// TODO
}
}
reader.close();
return true;
}, nThreads);
threadPool.close();
在这种场景下,即便 GTB 文件仅包含一个染色体,也会使得坐标字段被扫描多次,仅适用于编写通用计算程序时使用。
变异位点水平并行计算
变异位点水平的并行计算借助 GTBReader 的 part(int nThreads)
进行,它将 GTBReader 的可读指针范围均等地划分为多个子部份,每一个子部份由一个子线程进行独立处理。以下为 GTB 文件按照变异位点水平并行计算的示例程序:
int nThreads = 6;
// 打开文件对象
GTBManager manager = GTBManager.load("<path_to_GTB>");
// 开启线程池并行计算
ThreadPool threadPool = new ThreadPool(nThreads);
// 请注意是否需要加载基因型、补充字段
BaseArray<GTBReader> readers = manager.instanceReader().part(nThreads);
// 线程编号
AtomicInteger threadId = new AtomicInteger(0);
threadPool.submit(() -> {
// 获取当前线程 id, 并获取对应的分部读取器
int localThreadId = threadId.getAndIncrement();
GTBReader reader = readers.get(localThreadId);
// 按行读取位点
Variant variant;
while ((variant = reader.read()) != null) {
// 获取基因型序列
Genotypes genotypes = (Genotypes) variant.getProperty(IGenotypes.class);
// TODO
}
reader.close();
return true;
}, readers.size());
threadPool.close();
使用该框架的计算程序实例是 edu.sysu.pmglab.gbc.GTBExporter。