Coverage for src/keel/lock.py: 100%
91 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 18:07 +0000
1"""Single-host resource claims backed by atomic ``mkdir``.
3Every merge goes through the merge lock (a keel invariant) so concurrent ``ship``
4runs on the same checkout cannot race the branch tip. The merge lock is now one
5consumer of the generalized resource-claim primitive below: ``mkdir`` is atomic,
6so a resource directory is either created for one owner or already held.
7"""
9from __future__ import annotations
11import hashlib
12import json
13import re
14from collections.abc import Iterator
15from contextlib import contextmanager
16from dataclasses import asdict, dataclass
17from pathlib import Path
18from typing import Any
20SCHEMA_VERSION = "keel.resource-claim.v1"
23class LockError(RuntimeError):
24 """Raised when the merge lock is already held."""
27@dataclass(frozen=True)
28class ClaimResult:
29 """Structured result for a single-host resource claim operation."""
31 schema_version: str
32 resource: str
33 owner: str
34 path: str
35 granted: bool
36 status: str
37 reason: str
38 holder: str | None = None
40 def as_dict(self) -> dict[str, Any]:
41 """Return a JSON-compatible deterministic representation."""
42 return asdict(self)
45def contract_as_dict() -> dict[str, Any]:
46 """Return the stable resource-claim contract."""
47 return {
48 "schema_version": SCHEMA_VERSION,
49 "consumer_neutral": True,
50 "deterministic": True,
51 "stdlib_only": True,
52 "scope": "single-host",
53 "primitive": "mkdir",
54 "deny_mode": "structured-feedback",
55 "statuses": ["granted", "denied", "released", "missing", "not-owner"],
56 "merge_lock_consumer": True,
57 "stale_recovery": "caller-owned",
58 }
61def claim_resource(root: str | Path, resource: str, *, owner: str) -> ClaimResult:
62 """Claim a named resource under ``root`` for exactly one owner."""
63 path = resource_path(root, resource)
64 return _claim_path(path, resource=_clean(resource, "unknown-resource"), owner=owner)
67def release_resource(
68 root: str | Path,
69 resource: str,
70 *,
71 owner: str | None = None,
72 best_effort: bool = False,
73) -> ClaimResult:
74 """Release a named resource claim, optionally requiring the owner to match."""
75 path = resource_path(root, resource)
76 return _release_path(
77 path,
78 resource=_clean(resource, "unknown-resource"),
79 owner=owner,
80 best_effort=best_effort,
81 )
84@contextmanager
85def resource_claim(root: str | Path, resource: str, *, owner: str) -> Iterator[ClaimResult]:
86 """Context manager that yields a structured claim result and releases on success."""
87 result = claim_resource(root, resource, owner=owner)
88 try:
89 yield result
90 finally:
91 if result.granted:
92 release_resource(root, resource, owner=owner)
95def resource_path(root: str | Path, resource: str) -> Path:
96 """Return the deterministic lock directory path for ``resource``."""
97 name = _clean(resource, "unknown-resource")
98 slug = re.sub(r"[^A-Za-z0-9_.-]+", "-", name).strip(".-").lower()
99 slug = slug or "resource"
100 digest = hashlib.sha256(name.encode("utf-8")).hexdigest()[:12]
101 return Path(root) / f"{slug}-{digest}.lock"
104@contextmanager
105def merge_lock(lock_dir: str | Path) -> Iterator[Path]:
106 """Acquire the merge lock for the duration of the ``with`` block."""
107 path = Path(lock_dir)
108 result = _claim_path(path, resource="merge", owner="merge-lock")
109 if not result.granted:
110 raise LockError(f"merge lock already held: {path}")
111 try:
112 yield path
113 finally:
114 _release_path(path, resource="merge", owner="merge-lock", best_effort=True)
117def _claim_path(path: Path, *, resource: str, owner: str) -> ClaimResult:
118 clean_owner = _clean(owner, "unknown-owner")
119 try:
120 path.mkdir(parents=True)
121 except FileExistsError:
122 return ClaimResult(
123 schema_version=SCHEMA_VERSION,
124 resource=resource,
125 owner=clean_owner,
126 path=str(path),
127 granted=False,
128 status="denied",
129 reason="resource-already-claimed",
130 holder=_holder(path),
131 )
132 _write_owner(path, clean_owner)
133 return ClaimResult(
134 schema_version=SCHEMA_VERSION,
135 resource=resource,
136 owner=clean_owner,
137 path=str(path),
138 granted=True,
139 status="granted",
140 reason="claim-acquired",
141 holder=clean_owner,
142 )
145def _release_path(
146 path: Path,
147 *,
148 resource: str,
149 owner: str | None,
150 best_effort: bool = False,
151) -> ClaimResult:
152 clean_owner = _clean(owner, "unknown-owner") if owner is not None else "any-owner"
153 if not path.exists():
154 return ClaimResult(
155 schema_version=SCHEMA_VERSION,
156 resource=resource,
157 owner=clean_owner,
158 path=str(path),
159 granted=False,
160 status="missing",
161 reason="resource-not-claimed",
162 )
163 holder = _holder(path)
164 if owner is not None and holder is not None and holder != clean_owner:
165 return ClaimResult(
166 schema_version=SCHEMA_VERSION,
167 resource=resource,
168 owner=clean_owner,
169 path=str(path),
170 granted=False,
171 status="not-owner",
172 reason="resource-held-by-different-owner",
173 holder=holder,
174 )
175 try:
176 owner_file = path / "owner.json"
177 if owner_file.exists():
178 owner_file.unlink()
179 path.rmdir()
180 except OSError:
181 if not best_effort:
182 raise
183 return ClaimResult(
184 schema_version=SCHEMA_VERSION,
185 resource=resource,
186 owner=clean_owner,
187 path=str(path),
188 granted=False,
189 status="released",
190 reason="claim-released",
191 holder=holder,
192 )
195def _write_owner(path: Path, owner: str) -> None:
196 (path / "owner.json").write_text(
197 json.dumps({"owner": owner}, sort_keys=True) + "\n",
198 encoding="utf-8",
199 )
202def _holder(path: Path) -> str | None:
203 owner_file = path / "owner.json"
204 if not owner_file.exists():
205 return None
206 try:
207 data = json.loads(owner_file.read_text(encoding="utf-8"))
208 except (OSError, json.JSONDecodeError):
209 return None
210 owner = data.get("owner") if isinstance(data, dict) else None
211 return owner if isinstance(owner, str) and owner.strip() else None
214def _clean(value: str | None, fallback: str) -> str:
215 return value.strip() if isinstance(value, str) and value.strip() else fallback