教程:使用Theta扫描管道的端到端研究工作流¶
阅读时间:约30-35分钟 难度:初级到中级 前置条件:对导航轨迹的基本理解
本教程演示如何使用ThetaSweepPipeline对实验轨迹数据进行完整的端到端分析,无需深入了解CANN实现细节。
目录¶
管道简介
快速开始:基本管道使用
加载外部轨迹数据
高级自定义
完整研究示例
后续步骤
1. 管道介绍¶
1.1 什么是管道?¶
管道 为完整的分析提供了一个高级接口,无需实现细节:
[ ]:
# 传统方法(手动)
1. 加载数据
2. 创建网络
3. 运行模拟
4. 计算分析
5. 生成可视化
6. 保存结果
# 管道方法(自动化)
pipeline = ThetaSweepPipeline(trajectory_data, times)
results = pipeline.run() # 全部完成!
1.2 为什么使用管道?¶
对于实验神经科学家:
无需理解CANN数学原理
专注于您的数据和研究问题
可重复、标准化的分析
对于计算研究人员:
快速原型设计
参数扫描和批处理
一致的输出格式
1.3 ThetaSweepPipeline 概述¶
ThetaSweepPipeline 实现完整的theta扫描分析:
输入:
轨迹数据(位置随时间的变化)
时序信息
自动处理:
方向细胞网络模拟
网格细胞网络模拟
Theta调制计算
群体活动分析
输出:
轨迹分析图
Theta扫描动画
群体活动可视化
用于自定义分析的原始模拟数据
2. 快速开始:基本管道用法¶
2.1 最小示例¶
最简单的用法 - 仅包含轨迹和时间:
[ ]:
import numpy as np
from canns.pipeline import ThetaSweepPipeline
# 示例:加载你的实验轨迹
# positions: 形状 (n_steps, 2) - [x, y] 坐标
# times: 形状 (n_steps,) - 以秒为单位的时间戳
positions = np.load('my_trajectory.npy') # 你的数据
times = np.load('my_times.npy')
# 运行完整分析(只需一行!)
pipeline = ThetaSweepPipeline(
trajectory_data=positions,
times=times
)
results = pipeline.run(output_dir="results/")
print(f"动画已保存到: {results['animation_path']}")
print(f"分析图表位于: results/")
就这样!管道会自动处理所有事情。
2.2 理解输出结果¶
运行后,你将在输出目录中找到:
results/
├── trajectory_analysis.png # 轨迹路径和统计信息
├── population_activity_hd.png # 方向细胞活动
├── population_activity_gc.png # 网格细胞活动
├── theta_sweep_animation.gif # 完整动力学动画
└── simulation_data.npz # 用于自定义分析的原始数据
2.3 快速数据格式检查¶
你的轨迹数据应该是:
[ ]:
# positions: (n_steps, 2) 数组
print(f"位置形状: {positions.shape}") # 应为 (N, 2)
print(f"位置范围 X: [{positions[:,0].min()}, {positions[:,0].max()}]")
print(f"位置范围 Y: [{positions[:,1].min()}, {positions[:,1].max()}]")
# times: (n_steps,) 数组
print(f"时间形状: {times.shape}") # 应为 (N,)
print(f"持续时间: {times[-1] - times[0]:.2f}s")
print(f"平均 dt: {np.mean(np.diff(times)):.4f}s")
常见问题:
位置单位不是米?相应地缩放
时间单位不是秒?先转换
非均匀采样?管道自动处理
3. 加载外部轨迹数据¶
3.1 从 CSV 文件加载¶
[ ]:
import pandas as pd
# 从CSV加载
df = pd.read_csv('trajectory.csv')
# 提取位置和时间
positions = df[['x', 'y']].values # (n_steps, 2)
times = df['time'].values # (n_steps,)
# 运行管道
pipeline = ThetaSweepPipeline(positions, times)
results = pipeline.run()
CSV 格式示例:
time,x,y
0.000,0.5,0.5
0.001,0.501,0.502
0.002,0.503,0.505
...
3.2 从 MATLAB 文件导入¶
[ ]:
from scipy.io import loadmat
# 加载MATLAB文件
data = loadmat('trajectory.mat')
positions = data['positions'] # 已经是 (n_steps, 2)
times = data['times'].flatten() # 如果需要则展平
pipeline = ThetaSweepPipeline(positions, times)
results = pipeline.run()
3.3 来自跟踪软件¶
跟踪系统的常见格式:
[ ]:
# DeepLabCut输出
import pandas as pd
dlc_data = pd.read_csv('tracking_output.csv', header=[0,1,2])
x = dlc_data[('bodypart1', 'x')].values
y = dlc_data[('bodypart1', 'y')].values
positions = np.column_stack([x, y])
# Bonsai输出
bonsai_data = pd.read_csv('bonsai_tracking.csv')
positions = bonsai_data[['X', 'Y']].values / 100 # 将厘米转换为米
# 自定义追踪
# 始终确保:位置单位为米,时间单位为秒
3.4 合成测试数据¶
用于测试或演示:
[ ]:
def create_test_trajectory(n_steps=1000, dt=0.002):
"""创建平滑的测试轨迹"""
times = np.linspace(0, (n_steps-1)*dt, n_steps)
# 圆形轨迹加上一些噪声
t_param = np.linspace(0, 4*np.pi, n_steps)
radius = 0.5
center = np.array([0.75, 0.75])
x = center[0] + radius * np.cos(t_param)
y = center[1] + radius * np.sin(t_param)
# 添加小噪声
noise = np.random.normal(0, 0.01, (n_steps, 2))
positions = np.column_stack([x, y]) + noise
return positions, times
# 测试管道
positions, times = create_test_trajectory()
pipeline = ThetaSweepPipeline(positions, times, env_size=1.5)
results = pipeline.run(output_dir="test_results/")
4. 高级自定义¶
4.1 环境配置¶
调整环境参数:
[ ]:
pipeline = ThetaSweepPipeline(
trajectory_data=positions,
times=times,
env_size=2.0, # 环境大小(米)
dt=0.001, # 仿真时间步长(秒)
)
何时调整:
env_size: 匹配你的实验竞技场 (1m, 2m, 等)dt: 对于非常快的运动使用更细的时间分辨率
4.2 网络参数¶
自定义方向和网格细胞网络:
[ ]:
pipeline = ThetaSweepPipeline(
trajectory_data=positions,
times=times,
# 方向细胞网络
direction_cell_params={
'num': 100, # 方向细胞数量
'adaptation_strength': 15.0, # SFA强度
'noise_strength': 0.0, # 活动噪声
},
# 网格细胞网络
grid_cell_params={
'num_gc_x': 100, # 每个维度的网格细胞数
'adaptation_strength': 8.0, # SFA强度
'mapping_ratio': 5, # 网格间距控制
'phase_offset': 1.0/20, # Theta扫过幅度
},
)
参数效果:
更高的
adaptation_strength:更强的θ振荡更大的
mapping_ratio:更小的网格间距更大的
phase_offset:更大的θ扫描
4.3 Theta调制设置¶
控制θ节律参数:
[ ]:
pipeline = ThetaSweepPipeline(
trajectory_data=positions,
times=times,
theta_params={
'theta_strength_hd': 1.0, # 头向细胞调制强度
'theta_strength_gc': 0.5, # 网格细胞调制强度
'theta_cycle_len': 100.0, # 周期长度(毫秒)
},
)
生物学对应关系:
theta_cycle_len=100ms→ 10 Hz theta频率theta_cycle_len=125ms→ 8 Hz theta频率
4.4 动画和可视化¶
自定义输出可视化:
[ ]:
results = pipeline.run(
output_dir="custom_results/",
save_animation=True, # 生成动画
save_plots=True, # 保存分析图表
animation_fps=10, # 动画帧率
animation_n_step=20, # 每N帧采样一次
verbose=True, # 打印进度
)
性能提示:
降低
animation_fps以获得更小的文件提高
animation_n_step以加快生成速度设置
save_animation=False跳过动画生成(更快)
5. 完整的研究示例¶
5.1 现实场景¶
分析一个记录的导航会话:
[ ]:
import numpy as np
import pandas as pd
from canns.pipeline import ThetaSweepPipeline
# 加载实验数据
print("正在加载实验轨迹...")
df = pd.read_csv('experiment_2024_session_3.csv')
# 提取和预处理
positions = df[['x_cm', 'y_cm']].values / 100 # 将厘米转换为米
times = df['timestamp_ms'].values / 1000 # 将毫秒转换为秒
# 数据质量检查
print(f"轨迹: {len(positions)} 个样本")
print(f"持续时间: {times[-1] - times[0]:.2f}s")
print(f"位置范围: X[{positions[:,0].min():.2f}, {positions[:,0].max():.2f}]m, "
f"Y[{positions[:,1].min():.2f}, {positions[:,1].max():.2f}]m")
# 移除任何NaN值(跟踪失败)
valid = ~np.isnan(positions).any(axis=1)
positions = positions[valid]
times = times[valid]
print(f"有效样本: {len(positions)}")
5.2 运行综合分析¶
[ ]:
# 使用实验参数配置管道
pipeline = ThetaSweepPipeline(
trajectory_data=positions,
times=times,
env_size=1.5, # 1.5m x 1.5m 竞技场
direction_cell_params={
'num': 100,
'adaptation_strength': 15.0,
'noise_strength': 0.05, # 用于真实性的小噪声
},
grid_cell_params={
'num_gc_x': 100,
'adaptation_strength': 8.0,
'mapping_ratio': 5,
},
theta_params={
'theta_strength_hd': 1.0,
'theta_strength_gc': 0.5,
'theta_cycle_len': 100.0, # 10 Hz
},
)
# 运行分析
print("\n正在运行theta扫描分析...")
results = pipeline.run(
output_dir="experiment_analysis/",
save_animation=True,
save_plots=True,
animation_fps=15,
verbose=True,
)
print("\n✅ 分析完成!")
print(f"📊 结果已保存到: experiment_analysis/")
print(f"🎬 动画: {results['animation_path']}")
5.3 自定义后处理¶
访问原始仿真数据进行自定义分析:
[ ]:
# 加载模拟结果
sim_data = results['simulation_data']
# 可用数据
dc_activity = sim_data['dc_activity'] # 方向细胞放电
gc_activity = sim_data['gc_activity'] # 网格细胞放电
theta_phase = sim_data['theta_phase'] # 随时间的θ相位
internal_pos = sim_data['internal_position'] # 解码位置
# 示例分析:相位前进
import matplotlib.pyplot as plt
# 寻找高活动期
activity_threshold = gc_activity.mean() + gc_activity.std()
high_activity = gc_activity.max(axis=1) > activity_threshold
# 绘制高活动期间的相位对时间的关系
plt.figure(figsize=(12, 4))
plt.scatter(times[high_activity], theta_phase[high_activity],
c=gc_activity[high_activity].max(axis=1),
cmap='viridis', s=10, alpha=0.6)
plt.xlabel('时间 (s)')
plt.ylabel('Theta相位 (rad)')
plt.title('网格细胞高活动期间的相位前进')
plt.colorbar(label='Max Activity')
plt.savefig('experiment_analysis/phase_precession.png', dpi=150)
plt.show()
# 比较解码位置与实际位置
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(positions[:,0], positions[:,1], 'k-', alpha=0.5, label='Actual')
plt.plot(internal_pos[:,0], internal_pos[:,1], 'r-', alpha=0.7, label='Decoded')
plt.xlabel('X位置 (m)')
plt.ylabel('Y位置 (m)')
plt.title('位置跟踪精度')
plt.legend()
plt.axis('equal')
plt.subplot(1, 2, 2)
error = np.linalg.norm(positions - internal_pos, axis=1)
plt.plot(times, error, 'b-', alpha=0.7)
plt.xlabel('时间 (s)')
plt.ylabel('位置误差 (m)')
plt.title('随时间的解码误差')
plt.tight_layout()
plt.savefig('experiment_analysis/position_accuracy.png', dpi=150)
plt.show()
print(f"\n📈 平均位置误差: {error.mean():.3f}m")
print(f"📈 最大位置误差: {error.max():.3f}m")
5.4 批量处理多个会话¶
处理多个实验会话:
[ ]:
import glob
from pathlib import Path
# 查找所有会话文件
session_files = glob.glob('experiments/session_*.csv')
print(f"找到 {len(session_files)} 个会话需要处理")
# 处理每个会话
for session_file in session_files:
session_name = Path(session_file).stem
print(f"\n正在处理 {session_name}...")
# 加载数据
df = pd.read_csv(session_file)
positions = df[['x_cm', 'y_cm']].values / 100
times = df['timestamp_ms'].values / 1000
# 运行管道
pipeline = ThetaSweepPipeline(positions, times, env_size=1.5)
output_dir = f"batch_results/{session_name}/"
results = pipeline.run(
output_dir=output_dir,
save_animation=False, # 为了加速跳过动画
save_plots=True,
verbose=False,
)
print(f" ✓ 完成: {output_dir}")
print("\n🎉 批量处理完成!")
6. 后续步骤¶
恭喜!您已经学会了如何使用 ThetaSweepPipeline 进行端到端的 theta 扫描分析。
主要要点¶
管道简化工作流 - 复杂模型的一行代码分析
灵活的数据加载 - CSV、MATLAB、跟踪软件
自动输出 - 图表、动画和原始数据
可定制参数 - 需要时获得完全控制
批处理 - 高效地分析多个会话
何时使用管道¶
适合以下情况:
没有编码专业知识的实验神经科学家
快速原型设计和探索性分析
标准化处理多个数据集
生成出版质量的图表
教学和演示
考虑手动方法的情况:
需要非标准模型架构
实现新的分析方法
需要对每一步进行细粒度控制
扩展管道功能
管道功能总结¶
功能 |
基本用法 |
高级用法 |
|---|---|---|
数据输入 |
|
多种格式加载器 |
环境 |
自动检测 |
可定制的 |
网络 |
默认参数 |
完整参数字典 |
Theta |
默认 10 Hz |
自定义频率、强度 |
输出 |
标准图表 |
原始数据 + 自定义分析 |
批处理 |
单个会话 |
多会话处理 |
继续学习¶
扩展管道¶
想要修改或扩展管道?
检查源代码:
canns.pipeline.theta_sweep.ThetaSweepPipeline继承和自定义:
from canns.pipeline import ThetaSweepPipeline class MyCustomPipeline(ThetaSweepPipeline): def custom_analysis(self): # 在这里添加您的分析 pass
贡献: 通过 GitHub pull requests 提交增强功能
获取帮助¶
示例: 查看
examples/pipeline/目录问题: GitHub Issues
社区: GitHub 上的讨论和问答
最佳实践¶
数据质量第一: 在管道之前清理跟踪数据
从简单开始: 最初使用默认参数
验证输出: 检查轨迹图的合理性
文档参数: 保存配置以实现可重现性
版本控制: 跟踪用于每项分析的管道版本
感谢您完成 CANN 教程系列!您现在拥有 CANN 建模、脑启发式学习和实际研究工作流的全面知识。