""" 按组挂载 Git 资源(只读) + 自动建用户加组 资源路径 /home/zhangyi/jupyter-collection 控制文件 /home/zhangyi/jupyter-collection/.hub/resource_map.ini """ import os, grp, pwd, shutil, subprocess, configparser from pathlib import Path from jupyterhub.spawner import Spawner # ---------- 基础路径 ---------- SHARED_ROOT = Path("/home/zhangyi/jupyter-collection") # Git 仓库 USER_HOME = Path("/home/jupyter-{username}") CTRL_FILE = SHARED_ROOT / ".hub" / "resource_map.ini" # ---------- 工具:读控制表 ---------- def load_ctrl(): """返回 组->资源 和 组->用户 两个字典""" cfg = configparser.ConfigParser() cfg.read(CTRL_FILE) res_map = {g: [r.strip() for r in v.split(",")] for g, v in cfg.items("map")} usr_map = {g: [u.strip() for u in v.split(",")] for g, v in cfg.items("users")} return res_map, usr_map # ---------- post_auth_hook:自动建用户+加组 ---------- def ensure_user_and_groups(authenticator, handler, authentication): username = authentication['name'] res_map, usr_map = load_ctrl() wanted_groups = [g for g, users in usr_map.items() if username in users] if not wanted_groups: return authentication # 控制文件里没有就什么都不做 # 确保组存在 for g in wanted_groups: subprocess.run(["groupadd", "-f", g], check=False) # 系统账号不存在就创建(TLJH 会自己建 home,这里只确保用户存在) if subprocess.run(["id", "-u", username], capture_output=True).returncode != 0: subprocess.run([ "useradd", "-m", "-s", "/bin/bash", "-d", str(USER_HOME).format(username=username), username ], check=False) # 加组(幂等) for g in wanted_groups: subprocess.run(["usermod", "-aG", g, username], check=False) return authentication c.Authenticator.post_auth_hook = ensure_user_and_groups # ---------- pre_spawn_hook:只清 shared 并挂载 ---------- def prepare_user(spawner: Spawner): username = spawner.user.name user_dir = Path(str(USER_HOME).format(username=username)) # 1. 只清 shared 目录,保留用户其它文件 shared = user_dir / "shared" if shared.exists(): shutil.rmtree(shared) shared.mkdir(parents=True, exist_ok=True) shutil.chown(shared, user=username, group=username) # 2. 读控制表 res_map, _ = load_ctrl() # 3. 取系统组 try: pwnam = pwd.getpwnam(username) gid_list = os.getgrouplist(username, pwnam.pw_gid) except (KeyError, OSError): return user_groups = {grp.getgrgid(g).gr_name for g in gid_list} # 4. 组装 bind_mounts(SystemdSpawner 语法) bind_list = [] for grp_name in user_groups: for res in res_map.get(grp_name, []): res_path = SHARED_ROOT / res if res_path.is_dir(): bind_list.append({ 'source': str(res_path), 'target': str(shared / res), 'read-only': True, }) spawner.bind_mounts = bind_list c.Spawner.pre_spawn_hook = prepare_user