Coverage for src / main.py: 97%
316 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-30 15:00 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-30 15:00 +0000
1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelomas@gmail.com>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import json
7import re
8from pathlib import Path
9from typing import Optional
10from urllib.parse import urlparse
12from rdflib import BNode, Graph, Literal, Namespace, URIRef
13from rdflib.term import Node
14from rdflib.collection import Collection
15from rdflib.namespace import OWL, RDF, XSD
17SHAPES_BASE = "https://w3id.org/skg-if/shapes/"
18DC_DESCRIPTION = URIRef("http://purl.org/dc/elements/1.1/description")
19PROPERTY_PATTERN = r'([\w:-]+) -\[(\d+|[*N])(\.\.)?(\d+|[*N])?]->\s+([\w:-]+|\{[^}]+\})'
22def _is_url(source: str) -> bool:
23 return source.startswith('http://') or source.startswith('https://')
26def _get_ontology_iri(g: Graph) -> Optional[str]:
27 for s in g.subjects(RDF.type, OWL.Ontology, unique=True):
28 return str(s)
29 return None
32def _get_ext_module_name(source: str) -> Optional[str]:
33 for part in Path(source).resolve().parts:
34 if part.startswith("ext-") and len(part) > 4:
35 return part[4:]
36 return None
39def _derive_module_name(source: str, g: Graph) -> str:
40 if not _is_url(source):
41 ext_name = _get_ext_module_name(source)
42 if ext_name:
43 return ext_name
44 iri = _get_ontology_iri(g)
45 if iri:
46 parsed = urlparse(iri)
47 parts = [p for p in parsed.path.rstrip('/').split('/') if p]
48 if parts:
49 return parts[-1]
50 if _is_url(source):
51 parsed = urlparse(source)
52 parts = [p for p in parsed.path.rstrip('/').split('/') if p]
53 else:
54 parts = [Path(source).stem]
55 if parts:
56 name = parts[-1]
57 if '.' in name:
58 name = name.rsplit('.', 1)[0]
59 return name
60 return "ontology"
63def _derive_shapes_base(source: str, g: Graph) -> str:
64 iri = _get_ontology_iri(g)
65 if iri:
66 return iri.rstrip('/') + '/shapes/'
67 return "http://example.org/shapes/"
70PREFIX_PATTERN = re.compile(r'@prefix\s+(\w+):\s+<([^>]+)>\s*\.')
73def _build_uri_namespace_map(g: Graph) -> dict[str, str]:
74 result: dict[str, str] = {}
75 for s, p, o in g:
76 for term in (s, p, o):
77 if not isinstance(term, URIRef):
78 continue
79 uri = str(term)
80 if '#' in uri:
81 idx = uri.rindex('#') + 1
82 elif '/' in uri:
83 idx = uri.rindex('/') + 1
84 else:
85 continue
86 local = uri[idx:]
87 ns = uri[:idx]
88 if local and ns:
89 result[local] = ns
90 return result
93def _extract_prefixes_from_literals(g: Graph) -> dict[str, str]:
94 result: dict[str, str] = {}
95 for _, _, o in g:
96 if not isinstance(o, Literal):
97 continue
98 for match in PREFIX_PATTERN.finditer(str(o)):
99 result[match.group(1)] = match.group(2)
100 return result
103def _resolve_namespace(prefix: str, local_name: str, g: Graph,
104 uri_ns_map: dict[str, str],
105 literal_prefix_map: dict[str, str]) -> Optional[str]:
106 ns = g.store.namespace(prefix)
107 if ns:
108 return str(ns)
109 if local_name in uri_ns_map:
110 return uri_ns_map[local_name]
111 if prefix in literal_prefix_map:
112 return literal_prefix_map[prefix]
113 return None
116def _detect_root_classes(g: Graph, described_classes: set[str]) -> set[str]:
117 uri_ns_map = _build_uri_namespace_map(g)
118 literal_prefix_map = _extract_prefixes_from_literals(g)
119 referenced = set()
120 for cls in g.subjects(RDF.type, OWL.Class, unique=True):
121 desc = g.value(cls, DC_DESCRIPTION)
122 if not desc or "The properties that can be used" not in str(desc):
123 continue
124 properties = [p for p in re.split(r'\n[*-] ', str(desc)) if p.strip()][1:]
125 for prop in properties:
126 match = re.match(PROPERTY_PATTERN, prop.strip())
127 if not match:
128 continue
129 target = match.group(5)
130 if target.startswith('{'):
131 continue
132 if ':' in target:
133 target_prefix, target_local = target.split(':')
134 if target_prefix in ('rdfs', 'xsd'):
135 continue
136 target_ns = _resolve_namespace(target_prefix, target_local, g, uri_ns_map, literal_prefix_map)
137 else:
138 target_local = target
139 target_ns = uri_ns_map.get(target)
140 if target_ns:
141 referenced.add(target_ns + target_local)
142 return described_classes - referenced
145def load_ontology_by_module(path: str) -> dict[str, Graph]:
146 modules = {}
147 path_obj = Path(path)
149 module_dirs = [d for d in path_obj.iterdir() if d.is_dir() and d.name != "resources"]
150 for module_dir in sorted(module_dirs):
151 rdf_files = list(module_dir.glob("*.ttl")) + list(module_dir.glob("*.rdf")) + list(module_dir.glob("*.owl")) + list(module_dir.glob("*.n3")) + list(module_dir.glob("*.nt")) + list(module_dir.glob("*.jsonld"))
152 if rdf_files:
153 g = Graph()
154 g.parse(rdf_files[0])
155 modules[module_dir.name] = g
157 return modules
160def get_class_local_name(class_uri: str) -> str:
161 if '#' in class_uri:
162 return class_uri.split('#')[-1]
163 return class_uri.split('/')[-1]
166def _load_source(input_source: str) -> tuple[dict[str, Graph], bool]:
167 if _is_url(input_source):
168 g = Graph()
169 g.parse(input_source)
170 module_name = _derive_module_name(input_source, g)
171 return {module_name: g}, False
173 path = Path(input_source)
174 if path.is_dir():
175 return load_ontology_by_module(input_source), True
177 g = Graph()
178 g.parse(input_source)
179 module_name = _derive_module_name(input_source, g)
180 return {module_name: g}, False
183def _resolve_shapes_base(input_source: str, modules: dict[str, Graph], is_modular: bool, shapes_base: Optional[str]) -> str:
184 if shapes_base:
185 return shapes_base
186 if is_modular:
187 return SHAPES_BASE
188 first_g = next(iter(modules.values()))
189 return _derive_shapes_base(input_source, first_g)
192def _build_class_to_modules(modules: dict[str, Graph]) -> dict[str, list[str]]:
193 class_to_modules: dict[str, list[str]] = {}
194 for module_name, g in modules.items():
195 for cls in g.subjects(RDF.type, OWL.Class, unique=True):
196 desc = g.value(cls, DC_DESCRIPTION)
197 if desc and "The properties that can be used" in str(desc):
198 class_uri = str(cls)
199 if class_uri not in class_to_modules:
200 class_to_modules[class_uri] = []
201 class_to_modules[class_uri].append(module_name)
202 return class_to_modules
205def _resolve_root_class_uris(modules: dict[str, Graph], class_to_modules: dict[str, list[str]], root_classes: Optional[dict[str, str]] = None) -> set[str]:
206 if root_classes is not None:
207 return set(root_classes.values())
208 all_graphs = Graph()
209 for g in modules.values():
210 for triple in g:
211 all_graphs.add(triple)
212 for g in modules.values():
213 for prefix, namespace in g.namespaces():
214 all_graphs.bind(prefix, namespace)
215 return _detect_root_classes(all_graphs, set(class_to_modules.keys()))
218def _bind_namespaces(shacl: Graph, modules: dict[str, Graph]) -> None:
219 for _, g in modules.items():
220 for prefix, namespace in g.namespaces():
221 shacl.bind(prefix, namespace)
224def _bind_shape_namespaces(shacl: Graph, modules: dict[str, Graph], shapes_base: str, is_modular: bool) -> None:
225 if is_modular:
226 for module_name in modules.keys():
227 shape_ns = shapes_base + module_name + "/"
228 prefix = f"skg-sh-{module_name}".replace("-", "_")
229 shacl.bind(prefix, Namespace(shape_ns))
230 else:
231 module_name = next(iter(modules.keys()))
232 prefix = module_name.replace("-", "_") + "_sh"
233 shacl.bind(prefix, Namespace(shapes_base))
236def _parse_property(prop_text: str, class_uri: str, g: Graph,
237 uri_ns_map: dict[str, str],
238 literal_prefix_map: dict[str, str]) -> tuple[URIRef, str, str | None, str | None, str, str]:
239 match = re.match(PROPERTY_PATTERN, prop_text)
240 if not match:
241 raise ValueError(f"Invalid property format in {class_uri}: {prop_text}")
243 prop_name, card_min, range_sep, card_max, target = match.groups()
245 prop_prefix, prop_local = prop_name.split(':')
246 prop_ns = _resolve_namespace(prop_prefix, prop_local, g, uri_ns_map, literal_prefix_map)
247 if not prop_ns:
248 raise ValueError(f"Unknown prefix '{prop_prefix}' in {class_uri}: {prop_text}")
250 prop_uri = URIRef(prop_ns + prop_local)
251 return prop_uri, card_min, range_sep, card_max, target, prop_text
254def _resolve_target(target: str, class_uri: str, prop_text: str, g: Graph,
255 class_to_modules: dict[str, list[str]], module_name: str,
256 shapes_base: str, is_modular: bool, SH: Namespace,
257 uri_ns_map: dict[str, str],
258 literal_prefix_map: dict[str, str]) -> tuple[str, URIRef]:
259 if ':' in target:
260 target_prefix, target_local = target.split(':')
261 target_ns = _resolve_namespace(target_prefix, target_local, g, uri_ns_map, literal_prefix_map)
262 if not target_ns:
263 raise ValueError(f"Unknown prefix '{target_prefix}' in {class_uri}: {prop_text}")
264 else:
265 target_local = target
266 target_ns = uri_ns_map.get(target)
267 if not target_ns:
268 raise ValueError(f"Cannot resolve unqualified name '{target}' in {class_uri}: {prop_text}")
270 if target in ("rdfs:Literal", "rdfs:langString"):
271 return 'nodeKind', SH.Literal
272 if target.startswith("xsd:"):
273 return 'datatype', URIRef(f"http://www.w3.org/2001/XMLSchema#{target_local}")
275 target_uri = URIRef(target_ns + target_local)
276 target_class_uri = str(target_uri)
278 if target_class_uri in class_to_modules:
279 target_modules = class_to_modules[target_class_uri]
280 target_module = module_name if module_name in target_modules else sorted(target_modules)[0]
281 target_shape_ns = shapes_base + target_module + "/" if is_modular else shapes_base
282 return 'node', URIRef(target_shape_ns + target_local + "Shape")
283 return 'nodeKind', SH.BlankNodeOrIRI
286def _resolve_controlled_vocabulary(target: str, class_uri: str, prop_text: str,
287 g: Graph, uri_ns_map: dict[str, str],
288 literal_prefix_map: dict[str, str]) -> list[Node]:
289 values = target.strip('{}').split()
290 uris: list[Node] = []
291 for val in values:
292 if val.startswith('http://') or val.startswith('https://'):
293 uris.append(URIRef(val))
294 elif ':' not in val:
295 ns = uri_ns_map.get(val)
296 if not ns:
297 raise ValueError(f"Cannot resolve unqualified name '{val}' in {class_uri}: {prop_text}")
298 uris.append(URIRef(ns + val))
299 else:
300 prefix, local = val.split(':', 1)
301 ns = _resolve_namespace(prefix, local, g, uri_ns_map, literal_prefix_map)
302 if not ns:
303 raise ValueError(f"Unknown prefix '{prefix}' in {class_uri}: {prop_text}")
304 uris.append(URIRef(ns + local))
305 return uris
308def _emit_cardinality(bnode: BNode, card_min: str, range_sep: str | None,
309 card_max: str | None, shacl: Graph, SH: Namespace) -> None:
310 if range_sep is None and card_min not in ['*', 'N']:
311 exact_card = int(card_min)
312 shacl.add((bnode, SH.minCount, Literal(exact_card, datatype=XSD.integer)))
313 shacl.add((bnode, SH.maxCount, Literal(exact_card, datatype=XSD.integer)))
314 else:
315 if card_min and card_min not in ['*', 'N']:
316 shacl.add((bnode, SH.minCount, Literal(int(card_min), datatype=XSD.integer)))
317 if card_max and card_max not in ['*', 'N']:
318 shacl.add((bnode, SH.maxCount, Literal(int(card_max), datatype=XSD.integer)))
321def _emit_properties(parsed: list[tuple[URIRef, str, str | None, str | None, str, str]],
322 class_uri: str, g: Graph, shape_uri: URIRef,
323 class_to_modules: dict[str, list[str]], module_name: str,
324 shapes_base: str, is_modular: bool, shacl: Graph, SH: Namespace,
325 uri_ns_map: dict[str, str],
326 literal_prefix_map: dict[str, str]) -> None:
327 grouped: dict[URIRef, list[tuple[str, str | None, str | None, str, str]]] = {}
328 for prop_uri, card_min, range_sep, card_max, target, prop_text in parsed:
329 grouped.setdefault(prop_uri, []).append((card_min, range_sep, card_max, target, prop_text))
331 for prop_uri, entries in grouped.items():
332 bnode = BNode()
333 shacl.add((shape_uri, SH.property, bnode))
334 shacl.add((bnode, SH.path, prop_uri))
336 card_min, range_sep, card_max, _, _ = entries[0]
337 _emit_cardinality(bnode, card_min, range_sep, card_max, shacl, SH)
339 if len(entries) == 1:
340 target, prop_text = entries[0][3], entries[0][4]
341 if target.startswith('{'):
342 vocab_uris = _resolve_controlled_vocabulary(
343 target, class_uri, prop_text, g, uri_ns_map, literal_prefix_map)
344 list_node = BNode()
345 Collection(shacl, list_node, vocab_uris)
346 shacl.add((bnode, SH['in'], list_node))
347 else:
348 constraint_type, constraint_value = _resolve_target(
349 target, class_uri, prop_text, g, class_to_modules, module_name,
350 shapes_base, is_modular, SH, uri_ns_map, literal_prefix_map)
351 shacl.add((bnode, SH[constraint_type], constraint_value))
352 else:
353 or_members = []
354 for _, _, _, target, prop_text in entries:
355 constraint_type, constraint_value = _resolve_target(
356 target, class_uri, prop_text, g, class_to_modules, module_name,
357 shapes_base, is_modular, SH, uri_ns_map, literal_prefix_map)
358 member = BNode()
359 shacl.add((member, SH[constraint_type], constraint_value))
360 or_members.append(member)
361 list_node = BNode()
362 Collection(shacl, list_node, or_members)
363 shacl.add((bnode, SH['or'], list_node))
366def create_shacl_shapes(input_source: str | Path, shapes_base: Optional[str] = None, root_classes: Optional[dict[str, str]] = None) -> Graph:
367 input_source = str(input_source)
368 modules, is_modular = _load_source(input_source)
369 shapes_base = _resolve_shapes_base(input_source, modules, is_modular, shapes_base)
371 shacl = Graph()
372 SH = Namespace("http://www.w3.org/ns/shacl#")
373 shacl.bind('sh', SH)
375 _bind_namespaces(shacl, modules)
377 class_to_modules = _build_class_to_modules(modules)
378 root_class_uris = _resolve_root_class_uris(modules, class_to_modules, root_classes)
380 _bind_shape_namespaces(shacl, modules, shapes_base, is_modular)
382 for module_name, g in modules.items():
383 shape_ns = Namespace(shapes_base + module_name + "/") if is_modular else Namespace(shapes_base)
384 uri_ns_map = _build_uri_namespace_map(g)
385 literal_prefix_map = _extract_prefixes_from_literals(g)
387 for cls in g.subjects(RDF.type, OWL.Class, unique=True):
388 desc = g.value(cls, DC_DESCRIPTION)
389 if not desc:
390 continue
392 desc_str = str(desc)
393 if "The properties that can be used" not in desc_str:
394 continue
396 class_uri = str(cls)
397 class_local = get_class_local_name(class_uri)
398 shape_uri = URIRef(str(shape_ns) + class_local + "Shape")
400 shacl.add((shape_uri, RDF.type, SH.NodeShape))
402 if class_uri in root_class_uris:
403 shacl.add((shape_uri, SH.targetClass, cls))
405 properties = [p for p in re.split(r'\n[*-] ', desc_str) if p.strip()][1:]
407 parsed = []
408 for prop in properties:
409 prop_text = prop.strip()
410 parsed.append(_parse_property(prop_text, class_uri, g, uri_ns_map, literal_prefix_map))
412 _emit_properties(parsed, class_uri, g, shape_uri,
413 class_to_modules, module_name,
414 shapes_base, is_modular, shacl, SH,
415 uri_ns_map, literal_prefix_map)
417 return shacl
420def main():
421 parser = argparse.ArgumentParser(description='Extract SHACL shapes from OWL ontologies')
422 parser.add_argument('input', help='Input ontology (file path, directory, or URL)')
423 parser.add_argument('output', help='Output SHACL file path')
424 parser.add_argument('--shapes-base', help='Base URL for shapes namespace')
425 parser.add_argument('--root-classes', help='JSON file mapping module names to root class URIs')
427 args = parser.parse_args()
429 root_classes = None
430 if args.root_classes:
431 with open(args.root_classes, encoding='utf-8') as f:
432 root_classes = json.load(f)
434 shacl_graph = create_shacl_shapes(args.input, shapes_base=args.shapes_base, root_classes=root_classes)
435 shacl_graph.serialize(destination=args.output, format="turtle", encoding="utf-8")
438if __name__ == "__main__": # pragma: no cover
439 main()