Coverage for src / main.py: 97%

316 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-30 15:00 +0000

1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelomas@gmail.com> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import json 

7import re 

8from pathlib import Path 

9from typing import Optional 

10from urllib.parse import urlparse 

11 

12from rdflib import BNode, Graph, Literal, Namespace, URIRef 

13from rdflib.term import Node 

14from rdflib.collection import Collection 

15from rdflib.namespace import OWL, RDF, XSD 

16 

17SHAPES_BASE = "https://w3id.org/skg-if/shapes/" 

18DC_DESCRIPTION = URIRef("http://purl.org/dc/elements/1.1/description") 

19PROPERTY_PATTERN = r'([\w:-]+) -\[(\d+|[*N])(\.\.)?(\d+|[*N])?]->\s+([\w:-]+|\{[^}]+\})' 

20 

21 

22def _is_url(source: str) -> bool: 

23 return source.startswith('http://') or source.startswith('https://') 

24 

25 

26def _get_ontology_iri(g: Graph) -> Optional[str]: 

27 for s in g.subjects(RDF.type, OWL.Ontology, unique=True): 

28 return str(s) 

29 return None 

30 

31 

32def _get_ext_module_name(source: str) -> Optional[str]: 

33 for part in Path(source).resolve().parts: 

34 if part.startswith("ext-") and len(part) > 4: 

35 return part[4:] 

36 return None 

37 

38 

39def _derive_module_name(source: str, g: Graph) -> str: 

40 if not _is_url(source): 

41 ext_name = _get_ext_module_name(source) 

42 if ext_name: 

43 return ext_name 

44 iri = _get_ontology_iri(g) 

45 if iri: 

46 parsed = urlparse(iri) 

47 parts = [p for p in parsed.path.rstrip('/').split('/') if p] 

48 if parts: 

49 return parts[-1] 

50 if _is_url(source): 

51 parsed = urlparse(source) 

52 parts = [p for p in parsed.path.rstrip('/').split('/') if p] 

53 else: 

54 parts = [Path(source).stem] 

55 if parts: 

56 name = parts[-1] 

57 if '.' in name: 

58 name = name.rsplit('.', 1)[0] 

59 return name 

60 return "ontology" 

61 

62 

63def _derive_shapes_base(source: str, g: Graph) -> str: 

64 iri = _get_ontology_iri(g) 

65 if iri: 

66 return iri.rstrip('/') + '/shapes/' 

67 return "http://example.org/shapes/" 

68 

69 

70PREFIX_PATTERN = re.compile(r'@prefix\s+(\w+):\s+<([^>]+)>\s*\.') 

71 

72 

73def _build_uri_namespace_map(g: Graph) -> dict[str, str]: 

74 result: dict[str, str] = {} 

75 for s, p, o in g: 

76 for term in (s, p, o): 

77 if not isinstance(term, URIRef): 

78 continue 

79 uri = str(term) 

80 if '#' in uri: 

81 idx = uri.rindex('#') + 1 

82 elif '/' in uri: 

83 idx = uri.rindex('/') + 1 

84 else: 

85 continue 

86 local = uri[idx:] 

87 ns = uri[:idx] 

88 if local and ns: 

89 result[local] = ns 

90 return result 

91 

92 

93def _extract_prefixes_from_literals(g: Graph) -> dict[str, str]: 

94 result: dict[str, str] = {} 

95 for _, _, o in g: 

96 if not isinstance(o, Literal): 

97 continue 

98 for match in PREFIX_PATTERN.finditer(str(o)): 

99 result[match.group(1)] = match.group(2) 

100 return result 

101 

102 

103def _resolve_namespace(prefix: str, local_name: str, g: Graph, 

104 uri_ns_map: dict[str, str], 

105 literal_prefix_map: dict[str, str]) -> Optional[str]: 

106 ns = g.store.namespace(prefix) 

107 if ns: 

108 return str(ns) 

109 if local_name in uri_ns_map: 

110 return uri_ns_map[local_name] 

111 if prefix in literal_prefix_map: 

112 return literal_prefix_map[prefix] 

113 return None 

114 

115 

116def _detect_root_classes(g: Graph, described_classes: set[str]) -> set[str]: 

117 uri_ns_map = _build_uri_namespace_map(g) 

118 literal_prefix_map = _extract_prefixes_from_literals(g) 

119 referenced = set() 

120 for cls in g.subjects(RDF.type, OWL.Class, unique=True): 

121 desc = g.value(cls, DC_DESCRIPTION) 

122 if not desc or "The properties that can be used" not in str(desc): 

123 continue 

124 properties = [p for p in re.split(r'\n[*-] ', str(desc)) if p.strip()][1:] 

125 for prop in properties: 

126 match = re.match(PROPERTY_PATTERN, prop.strip()) 

127 if not match: 

128 continue 

129 target = match.group(5) 

130 if target.startswith('{'): 

131 continue 

132 if ':' in target: 

133 target_prefix, target_local = target.split(':') 

134 if target_prefix in ('rdfs', 'xsd'): 

135 continue 

136 target_ns = _resolve_namespace(target_prefix, target_local, g, uri_ns_map, literal_prefix_map) 

137 else: 

138 target_local = target 

139 target_ns = uri_ns_map.get(target) 

140 if target_ns: 

141 referenced.add(target_ns + target_local) 

142 return described_classes - referenced 

143 

144 

145def load_ontology_by_module(path: str) -> dict[str, Graph]: 

146 modules = {} 

147 path_obj = Path(path) 

148 

149 module_dirs = [d for d in path_obj.iterdir() if d.is_dir() and d.name != "resources"] 

150 for module_dir in sorted(module_dirs): 

151 rdf_files = list(module_dir.glob("*.ttl")) + list(module_dir.glob("*.rdf")) + list(module_dir.glob("*.owl")) + list(module_dir.glob("*.n3")) + list(module_dir.glob("*.nt")) + list(module_dir.glob("*.jsonld")) 

152 if rdf_files: 

153 g = Graph() 

154 g.parse(rdf_files[0]) 

155 modules[module_dir.name] = g 

156 

157 return modules 

158 

159 

160def get_class_local_name(class_uri: str) -> str: 

161 if '#' in class_uri: 

162 return class_uri.split('#')[-1] 

163 return class_uri.split('/')[-1] 

164 

165 

166def _load_source(input_source: str) -> tuple[dict[str, Graph], bool]: 

167 if _is_url(input_source): 

168 g = Graph() 

169 g.parse(input_source) 

170 module_name = _derive_module_name(input_source, g) 

171 return {module_name: g}, False 

172 

173 path = Path(input_source) 

174 if path.is_dir(): 

175 return load_ontology_by_module(input_source), True 

176 

177 g = Graph() 

178 g.parse(input_source) 

179 module_name = _derive_module_name(input_source, g) 

180 return {module_name: g}, False 

181 

182 

183def _resolve_shapes_base(input_source: str, modules: dict[str, Graph], is_modular: bool, shapes_base: Optional[str]) -> str: 

184 if shapes_base: 

185 return shapes_base 

186 if is_modular: 

187 return SHAPES_BASE 

188 first_g = next(iter(modules.values())) 

189 return _derive_shapes_base(input_source, first_g) 

190 

191 

192def _build_class_to_modules(modules: dict[str, Graph]) -> dict[str, list[str]]: 

193 class_to_modules: dict[str, list[str]] = {} 

194 for module_name, g in modules.items(): 

195 for cls in g.subjects(RDF.type, OWL.Class, unique=True): 

196 desc = g.value(cls, DC_DESCRIPTION) 

197 if desc and "The properties that can be used" in str(desc): 

198 class_uri = str(cls) 

199 if class_uri not in class_to_modules: 

200 class_to_modules[class_uri] = [] 

201 class_to_modules[class_uri].append(module_name) 

202 return class_to_modules 

203 

204 

205def _resolve_root_class_uris(modules: dict[str, Graph], class_to_modules: dict[str, list[str]], root_classes: Optional[dict[str, str]] = None) -> set[str]: 

206 if root_classes is not None: 

207 return set(root_classes.values()) 

208 all_graphs = Graph() 

209 for g in modules.values(): 

210 for triple in g: 

211 all_graphs.add(triple) 

212 for g in modules.values(): 

213 for prefix, namespace in g.namespaces(): 

214 all_graphs.bind(prefix, namespace) 

215 return _detect_root_classes(all_graphs, set(class_to_modules.keys())) 

216 

217 

218def _bind_namespaces(shacl: Graph, modules: dict[str, Graph]) -> None: 

219 for _, g in modules.items(): 

220 for prefix, namespace in g.namespaces(): 

221 shacl.bind(prefix, namespace) 

222 

223 

224def _bind_shape_namespaces(shacl: Graph, modules: dict[str, Graph], shapes_base: str, is_modular: bool) -> None: 

225 if is_modular: 

226 for module_name in modules.keys(): 

227 shape_ns = shapes_base + module_name + "/" 

228 prefix = f"skg-sh-{module_name}".replace("-", "_") 

229 shacl.bind(prefix, Namespace(shape_ns)) 

230 else: 

231 module_name = next(iter(modules.keys())) 

232 prefix = module_name.replace("-", "_") + "_sh" 

233 shacl.bind(prefix, Namespace(shapes_base)) 

234 

235 

236def _parse_property(prop_text: str, class_uri: str, g: Graph, 

237 uri_ns_map: dict[str, str], 

238 literal_prefix_map: dict[str, str]) -> tuple[URIRef, str, str | None, str | None, str, str]: 

239 match = re.match(PROPERTY_PATTERN, prop_text) 

240 if not match: 

241 raise ValueError(f"Invalid property format in {class_uri}: {prop_text}") 

242 

243 prop_name, card_min, range_sep, card_max, target = match.groups() 

244 

245 prop_prefix, prop_local = prop_name.split(':') 

246 prop_ns = _resolve_namespace(prop_prefix, prop_local, g, uri_ns_map, literal_prefix_map) 

247 if not prop_ns: 

248 raise ValueError(f"Unknown prefix '{prop_prefix}' in {class_uri}: {prop_text}") 

249 

250 prop_uri = URIRef(prop_ns + prop_local) 

251 return prop_uri, card_min, range_sep, card_max, target, prop_text 

252 

253 

254def _resolve_target(target: str, class_uri: str, prop_text: str, g: Graph, 

255 class_to_modules: dict[str, list[str]], module_name: str, 

256 shapes_base: str, is_modular: bool, SH: Namespace, 

257 uri_ns_map: dict[str, str], 

258 literal_prefix_map: dict[str, str]) -> tuple[str, URIRef]: 

259 if ':' in target: 

260 target_prefix, target_local = target.split(':') 

261 target_ns = _resolve_namespace(target_prefix, target_local, g, uri_ns_map, literal_prefix_map) 

262 if not target_ns: 

263 raise ValueError(f"Unknown prefix '{target_prefix}' in {class_uri}: {prop_text}") 

264 else: 

265 target_local = target 

266 target_ns = uri_ns_map.get(target) 

267 if not target_ns: 

268 raise ValueError(f"Cannot resolve unqualified name '{target}' in {class_uri}: {prop_text}") 

269 

270 if target in ("rdfs:Literal", "rdfs:langString"): 

271 return 'nodeKind', SH.Literal 

272 if target.startswith("xsd:"): 

273 return 'datatype', URIRef(f"http://www.w3.org/2001/XMLSchema#{target_local}") 

274 

275 target_uri = URIRef(target_ns + target_local) 

276 target_class_uri = str(target_uri) 

277 

278 if target_class_uri in class_to_modules: 

279 target_modules = class_to_modules[target_class_uri] 

280 target_module = module_name if module_name in target_modules else sorted(target_modules)[0] 

281 target_shape_ns = shapes_base + target_module + "/" if is_modular else shapes_base 

282 return 'node', URIRef(target_shape_ns + target_local + "Shape") 

283 return 'nodeKind', SH.BlankNodeOrIRI 

284 

285 

286def _resolve_controlled_vocabulary(target: str, class_uri: str, prop_text: str, 

287 g: Graph, uri_ns_map: dict[str, str], 

288 literal_prefix_map: dict[str, str]) -> list[Node]: 

289 values = target.strip('{}').split() 

290 uris: list[Node] = [] 

291 for val in values: 

292 if val.startswith('http://') or val.startswith('https://'): 

293 uris.append(URIRef(val)) 

294 elif ':' not in val: 

295 ns = uri_ns_map.get(val) 

296 if not ns: 

297 raise ValueError(f"Cannot resolve unqualified name '{val}' in {class_uri}: {prop_text}") 

298 uris.append(URIRef(ns + val)) 

299 else: 

300 prefix, local = val.split(':', 1) 

301 ns = _resolve_namespace(prefix, local, g, uri_ns_map, literal_prefix_map) 

302 if not ns: 

303 raise ValueError(f"Unknown prefix '{prefix}' in {class_uri}: {prop_text}") 

304 uris.append(URIRef(ns + local)) 

305 return uris 

306 

307 

308def _emit_cardinality(bnode: BNode, card_min: str, range_sep: str | None, 

309 card_max: str | None, shacl: Graph, SH: Namespace) -> None: 

310 if range_sep is None and card_min not in ['*', 'N']: 

311 exact_card = int(card_min) 

312 shacl.add((bnode, SH.minCount, Literal(exact_card, datatype=XSD.integer))) 

313 shacl.add((bnode, SH.maxCount, Literal(exact_card, datatype=XSD.integer))) 

314 else: 

315 if card_min and card_min not in ['*', 'N']: 

316 shacl.add((bnode, SH.minCount, Literal(int(card_min), datatype=XSD.integer))) 

317 if card_max and card_max not in ['*', 'N']: 

318 shacl.add((bnode, SH.maxCount, Literal(int(card_max), datatype=XSD.integer))) 

319 

320 

321def _emit_properties(parsed: list[tuple[URIRef, str, str | None, str | None, str, str]], 

322 class_uri: str, g: Graph, shape_uri: URIRef, 

323 class_to_modules: dict[str, list[str]], module_name: str, 

324 shapes_base: str, is_modular: bool, shacl: Graph, SH: Namespace, 

325 uri_ns_map: dict[str, str], 

326 literal_prefix_map: dict[str, str]) -> None: 

327 grouped: dict[URIRef, list[tuple[str, str | None, str | None, str, str]]] = {} 

328 for prop_uri, card_min, range_sep, card_max, target, prop_text in parsed: 

329 grouped.setdefault(prop_uri, []).append((card_min, range_sep, card_max, target, prop_text)) 

330 

331 for prop_uri, entries in grouped.items(): 

332 bnode = BNode() 

333 shacl.add((shape_uri, SH.property, bnode)) 

334 shacl.add((bnode, SH.path, prop_uri)) 

335 

336 card_min, range_sep, card_max, _, _ = entries[0] 

337 _emit_cardinality(bnode, card_min, range_sep, card_max, shacl, SH) 

338 

339 if len(entries) == 1: 

340 target, prop_text = entries[0][3], entries[0][4] 

341 if target.startswith('{'): 

342 vocab_uris = _resolve_controlled_vocabulary( 

343 target, class_uri, prop_text, g, uri_ns_map, literal_prefix_map) 

344 list_node = BNode() 

345 Collection(shacl, list_node, vocab_uris) 

346 shacl.add((bnode, SH['in'], list_node)) 

347 else: 

348 constraint_type, constraint_value = _resolve_target( 

349 target, class_uri, prop_text, g, class_to_modules, module_name, 

350 shapes_base, is_modular, SH, uri_ns_map, literal_prefix_map) 

351 shacl.add((bnode, SH[constraint_type], constraint_value)) 

352 else: 

353 or_members = [] 

354 for _, _, _, target, prop_text in entries: 

355 constraint_type, constraint_value = _resolve_target( 

356 target, class_uri, prop_text, g, class_to_modules, module_name, 

357 shapes_base, is_modular, SH, uri_ns_map, literal_prefix_map) 

358 member = BNode() 

359 shacl.add((member, SH[constraint_type], constraint_value)) 

360 or_members.append(member) 

361 list_node = BNode() 

362 Collection(shacl, list_node, or_members) 

363 shacl.add((bnode, SH['or'], list_node)) 

364 

365 

366def create_shacl_shapes(input_source: str | Path, shapes_base: Optional[str] = None, root_classes: Optional[dict[str, str]] = None) -> Graph: 

367 input_source = str(input_source) 

368 modules, is_modular = _load_source(input_source) 

369 shapes_base = _resolve_shapes_base(input_source, modules, is_modular, shapes_base) 

370 

371 shacl = Graph() 

372 SH = Namespace("http://www.w3.org/ns/shacl#") 

373 shacl.bind('sh', SH) 

374 

375 _bind_namespaces(shacl, modules) 

376 

377 class_to_modules = _build_class_to_modules(modules) 

378 root_class_uris = _resolve_root_class_uris(modules, class_to_modules, root_classes) 

379 

380 _bind_shape_namespaces(shacl, modules, shapes_base, is_modular) 

381 

382 for module_name, g in modules.items(): 

383 shape_ns = Namespace(shapes_base + module_name + "/") if is_modular else Namespace(shapes_base) 

384 uri_ns_map = _build_uri_namespace_map(g) 

385 literal_prefix_map = _extract_prefixes_from_literals(g) 

386 

387 for cls in g.subjects(RDF.type, OWL.Class, unique=True): 

388 desc = g.value(cls, DC_DESCRIPTION) 

389 if not desc: 

390 continue 

391 

392 desc_str = str(desc) 

393 if "The properties that can be used" not in desc_str: 

394 continue 

395 

396 class_uri = str(cls) 

397 class_local = get_class_local_name(class_uri) 

398 shape_uri = URIRef(str(shape_ns) + class_local + "Shape") 

399 

400 shacl.add((shape_uri, RDF.type, SH.NodeShape)) 

401 

402 if class_uri in root_class_uris: 

403 shacl.add((shape_uri, SH.targetClass, cls)) 

404 

405 properties = [p for p in re.split(r'\n[*-] ', desc_str) if p.strip()][1:] 

406 

407 parsed = [] 

408 for prop in properties: 

409 prop_text = prop.strip() 

410 parsed.append(_parse_property(prop_text, class_uri, g, uri_ns_map, literal_prefix_map)) 

411 

412 _emit_properties(parsed, class_uri, g, shape_uri, 

413 class_to_modules, module_name, 

414 shapes_base, is_modular, shacl, SH, 

415 uri_ns_map, literal_prefix_map) 

416 

417 return shacl 

418 

419 

420def main(): 

421 parser = argparse.ArgumentParser(description='Extract SHACL shapes from OWL ontologies') 

422 parser.add_argument('input', help='Input ontology (file path, directory, or URL)') 

423 parser.add_argument('output', help='Output SHACL file path') 

424 parser.add_argument('--shapes-base', help='Base URL for shapes namespace') 

425 parser.add_argument('--root-classes', help='JSON file mapping module names to root class URIs') 

426 

427 args = parser.parse_args() 

428 

429 root_classes = None 

430 if args.root_classes: 

431 with open(args.root_classes, encoding='utf-8') as f: 

432 root_classes = json.load(f) 

433 

434 shacl_graph = create_shacl_shapes(args.input, shapes_base=args.shapes_base, root_classes=root_classes) 

435 shacl_graph.serialize(destination=args.output, format="turtle", encoding="utf-8") 

436 

437 

438if __name__ == "__main__": # pragma: no cover 

439 main()