Coverage for src/keel/lock.py: 100%

91 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 18:07 +0000

1"""Single-host resource claims backed by atomic ``mkdir``. 

2 

3Every merge goes through the merge lock (a keel invariant) so concurrent ``ship`` 

4runs on the same checkout cannot race the branch tip. The merge lock is now one 

5consumer of the generalized resource-claim primitive below: ``mkdir`` is atomic, 

6so a resource directory is either created for one owner or already held. 

7""" 

8 

9from __future__ import annotations 

10 

11import hashlib 

12import json 

13import re 

14from collections.abc import Iterator 

15from contextlib import contextmanager 

16from dataclasses import asdict, dataclass 

17from pathlib import Path 

18from typing import Any 

19 

20SCHEMA_VERSION = "keel.resource-claim.v1" 

21 

22 

23class LockError(RuntimeError): 

24 """Raised when the merge lock is already held.""" 

25 

26 

27@dataclass(frozen=True) 

28class ClaimResult: 

29 """Structured result for a single-host resource claim operation.""" 

30 

31 schema_version: str 

32 resource: str 

33 owner: str 

34 path: str 

35 granted: bool 

36 status: str 

37 reason: str 

38 holder: str | None = None 

39 

40 def as_dict(self) -> dict[str, Any]: 

41 """Return a JSON-compatible deterministic representation.""" 

42 return asdict(self) 

43 

44 

45def contract_as_dict() -> dict[str, Any]: 

46 """Return the stable resource-claim contract.""" 

47 return { 

48 "schema_version": SCHEMA_VERSION, 

49 "consumer_neutral": True, 

50 "deterministic": True, 

51 "stdlib_only": True, 

52 "scope": "single-host", 

53 "primitive": "mkdir", 

54 "deny_mode": "structured-feedback", 

55 "statuses": ["granted", "denied", "released", "missing", "not-owner"], 

56 "merge_lock_consumer": True, 

57 "stale_recovery": "caller-owned", 

58 } 

59 

60 

61def claim_resource(root: str | Path, resource: str, *, owner: str) -> ClaimResult: 

62 """Claim a named resource under ``root`` for exactly one owner.""" 

63 path = resource_path(root, resource) 

64 return _claim_path(path, resource=_clean(resource, "unknown-resource"), owner=owner) 

65 

66 

67def release_resource( 

68 root: str | Path, 

69 resource: str, 

70 *, 

71 owner: str | None = None, 

72 best_effort: bool = False, 

73) -> ClaimResult: 

74 """Release a named resource claim, optionally requiring the owner to match.""" 

75 path = resource_path(root, resource) 

76 return _release_path( 

77 path, 

78 resource=_clean(resource, "unknown-resource"), 

79 owner=owner, 

80 best_effort=best_effort, 

81 ) 

82 

83 

84@contextmanager 

85def resource_claim(root: str | Path, resource: str, *, owner: str) -> Iterator[ClaimResult]: 

86 """Context manager that yields a structured claim result and releases on success.""" 

87 result = claim_resource(root, resource, owner=owner) 

88 try: 

89 yield result 

90 finally: 

91 if result.granted: 

92 release_resource(root, resource, owner=owner) 

93 

94 

95def resource_path(root: str | Path, resource: str) -> Path: 

96 """Return the deterministic lock directory path for ``resource``.""" 

97 name = _clean(resource, "unknown-resource") 

98 slug = re.sub(r"[^A-Za-z0-9_.-]+", "-", name).strip(".-").lower() 

99 slug = slug or "resource" 

100 digest = hashlib.sha256(name.encode("utf-8")).hexdigest()[:12] 

101 return Path(root) / f"{slug}-{digest}.lock" 

102 

103 

104@contextmanager 

105def merge_lock(lock_dir: str | Path) -> Iterator[Path]: 

106 """Acquire the merge lock for the duration of the ``with`` block.""" 

107 path = Path(lock_dir) 

108 result = _claim_path(path, resource="merge", owner="merge-lock") 

109 if not result.granted: 

110 raise LockError(f"merge lock already held: {path}") 

111 try: 

112 yield path 

113 finally: 

114 _release_path(path, resource="merge", owner="merge-lock", best_effort=True) 

115 

116 

117def _claim_path(path: Path, *, resource: str, owner: str) -> ClaimResult: 

118 clean_owner = _clean(owner, "unknown-owner") 

119 try: 

120 path.mkdir(parents=True) 

121 except FileExistsError: 

122 return ClaimResult( 

123 schema_version=SCHEMA_VERSION, 

124 resource=resource, 

125 owner=clean_owner, 

126 path=str(path), 

127 granted=False, 

128 status="denied", 

129 reason="resource-already-claimed", 

130 holder=_holder(path), 

131 ) 

132 _write_owner(path, clean_owner) 

133 return ClaimResult( 

134 schema_version=SCHEMA_VERSION, 

135 resource=resource, 

136 owner=clean_owner, 

137 path=str(path), 

138 granted=True, 

139 status="granted", 

140 reason="claim-acquired", 

141 holder=clean_owner, 

142 ) 

143 

144 

145def _release_path( 

146 path: Path, 

147 *, 

148 resource: str, 

149 owner: str | None, 

150 best_effort: bool = False, 

151) -> ClaimResult: 

152 clean_owner = _clean(owner, "unknown-owner") if owner is not None else "any-owner" 

153 if not path.exists(): 

154 return ClaimResult( 

155 schema_version=SCHEMA_VERSION, 

156 resource=resource, 

157 owner=clean_owner, 

158 path=str(path), 

159 granted=False, 

160 status="missing", 

161 reason="resource-not-claimed", 

162 ) 

163 holder = _holder(path) 

164 if owner is not None and holder is not None and holder != clean_owner: 

165 return ClaimResult( 

166 schema_version=SCHEMA_VERSION, 

167 resource=resource, 

168 owner=clean_owner, 

169 path=str(path), 

170 granted=False, 

171 status="not-owner", 

172 reason="resource-held-by-different-owner", 

173 holder=holder, 

174 ) 

175 try: 

176 owner_file = path / "owner.json" 

177 if owner_file.exists(): 

178 owner_file.unlink() 

179 path.rmdir() 

180 except OSError: 

181 if not best_effort: 

182 raise 

183 return ClaimResult( 

184 schema_version=SCHEMA_VERSION, 

185 resource=resource, 

186 owner=clean_owner, 

187 path=str(path), 

188 granted=False, 

189 status="released", 

190 reason="claim-released", 

191 holder=holder, 

192 ) 

193 

194 

195def _write_owner(path: Path, owner: str) -> None: 

196 (path / "owner.json").write_text( 

197 json.dumps({"owner": owner}, sort_keys=True) + "\n", 

198 encoding="utf-8", 

199 ) 

200 

201 

202def _holder(path: Path) -> str | None: 

203 owner_file = path / "owner.json" 

204 if not owner_file.exists(): 

205 return None 

206 try: 

207 data = json.loads(owner_file.read_text(encoding="utf-8")) 

208 except (OSError, json.JSONDecodeError): 

209 return None 

210 owner = data.get("owner") if isinstance(data, dict) else None 

211 return owner if isinstance(owner, str) and owner.strip() else None 

212 

213 

214def _clean(value: str | None, fallback: str) -> str: 

215 return value.strip() if isinstance(value, str) and value.strip() else fallback