Source code for pyudbm.binding.utap_builder

"""
Python-first UTAP builder surface for authoring, patching, and exporting
``ModelDocument`` instances without writing XML directly.

This module provides two public styles:

* mutable chain builders for interactive authoring and patching
  (``ModelBuilder``, ``TemplateBuilder``);
* immutable spec dataclasses for deterministic generation and snapshots
  (``ModelSpec``, ``TemplateSpec``, ``LocationSpec``, ``EdgeSpec``,
  ``QuerySpec``).

Both styles normalize into one payload and delegate final native model
construction to the official UTAP-backed helper in ``pyudbm.binding._utap``.

.. note::

    This API is intentionally LLM-friendly: prefer semantic selectors
    (``where=...``), use ``inspect()`` and ``list_*()`` to retrieve stable
    snapshots and explicit indexes, and keep ``index`` operations as fallback
    only.

.. note::

    Patch operations fail loudly by design: selector miss and selector
    ambiguity both raise clear errors, and ambiguity errors include candidate
    indexes for deterministic recovery in automation pipelines.

Example::

    >>> from pyudbm.binding import ModelBuilder
    >>> document = (
    ...     ModelBuilder()
    ...     .clock("x")
    ...     .template("P")
    ...         .location("Init", initial=True)
    ...         .location("Done")
    ...         .edge("Init", "Done", when="x >= 1")
    ...         .end()
    ...     .process("P1", "P")
    ...     .system("P1")
    ...     .query("A[] not deadlock")
    ...     .build()
    ... )
    >>> document.templates[0].name
    'P'

Example::

    >>> builder = ModelBuilder.from_document(document)
    >>> snapshot = builder.inspect()
    >>> builder.update_query(where={"formula": "A[] not deadlock"}, comment="patched by pipeline")
    >>> with builder.edit_template("P") as template:
    ...     template.update_edge(where={"source": "Init", "target": "Done"}, when="x >= 2")
    >>> patched = builder.build()
    >>> len(snapshot["templates"]) >= 1
    True

Example::

    >>> from pyudbm.binding.utap_builder import LocationSpec, ModelSpec, TemplateSpec, build_model
    >>> spec_document = build_model(
    ...     ModelSpec(
    ...         templates=(TemplateSpec("P", locations=(LocationSpec("Init", initial=True),)),),
    ...         processes=(("P1", "P", ()),),
    ...         system_process_names=("P1",),
    ...     )
    ... )
    >>> spec_document.processes[0].name
    'P1'
"""

from __future__ import annotations

from collections.abc import Mapping as MappingABC
from dataclasses import dataclass, field
import xml.etree.ElementTree as ET
from typing import Iterable, List, Optional, Sequence, Tuple, Union

from ._utap import _build_model as _native_build_model
from .utap import Expectation, ModelDocument, Option, Query

__all__ = [
    "EdgeSpec",
    "LocationSpec",
    "ModelBuilder",
    "ModelSpec",
    "QuerySpec",
    "TemplateBuilder",
    "TemplateSpec",
    "build_model",
]

_UNSET = object()


[docs] @dataclass(frozen=True) class LocationSpec: """ Immutable specification of one template location. :param name: Location name. :type name: str :param initial: Whether this is the initial location. :type initial: bool :param invariant: Optional invariant text. :type invariant: str :param urgent: Whether the location is urgent. :type urgent: bool :param committed: Whether the location is committed. :type committed: bool Example:: >>> from pyudbm.binding.utap_builder import LocationSpec >>> LocationSpec("Init", initial=True).initial True """ name: str initial: bool = False invariant: str = "" urgent: bool = False committed: bool = False
[docs] @dataclass(frozen=True) class EdgeSpec: """ Immutable specification of one template edge. ``guard``, ``sync``, and ``update`` are the normalized textual fields ultimately consumed by the native builder. :param source: Source location name. :type source: str :param target: Target location name. :type target: str :param guard: Optional normalized guard text. :type guard: str :param sync: Optional normalized synchronisation text. :type sync: str :param update: Optional normalized assignment text. :type update: str Example:: >>> from pyudbm.binding.utap_builder import EdgeSpec >>> EdgeSpec("Idle", "Busy", sync="tick!").sync 'tick!' """ source: str target: str guard: str = "" sync: str = "" update: str = ""
[docs] @dataclass(frozen=True) class QuerySpec: """ Immutable specification of one model query. :param formula: Query formula text. :type formula: str :param comment: Optional query comment text. :type comment: str :param options: Query-local option entries. :type options: Tuple[Option, ...] :param expectation: Optional expectation metadata. :type expectation: Expectation or None :param location: Optional query location text. :type location: str Example:: >>> from pyudbm.binding.utap_builder import QuerySpec >>> QuerySpec("A[] not deadlock").formula 'A[] not deadlock' """ formula: str comment: str = "" options: Tuple[Option, ...] = () expectation: Optional[Expectation] = None location: str = ""
[docs] @dataclass(frozen=True) class TemplateSpec: """ Immutable specification of one template. :param name: Template name. :type name: str :param parameters: Optional raw parameter text. :type parameters: str :param declarations: Raw local declaration blocks. :type declarations: Tuple[str, ...] :param locations: Template locations. :type locations: Tuple[LocationSpec, ...] :param edges: Template edges. :type edges: Tuple[EdgeSpec, ...] Example:: >>> from pyudbm.binding.utap_builder import LocationSpec, TemplateSpec >>> spec = TemplateSpec("P", locations=(LocationSpec("Init", initial=True),)) >>> spec.name 'P' """ name: str parameters: str = "" declarations: Tuple[str, ...] = () locations: Tuple[LocationSpec, ...] = () edges: Tuple[EdgeSpec, ...] = ()
[docs] @dataclass(frozen=True) class ModelSpec: """ Immutable specification of one UTAP model. ``processes`` uses tuples of the shape ``(process_name, template_name, arguments)`` where ``arguments`` is a tuple of raw textual argument expressions. :param declarations: Raw global declaration blocks. :type declarations: Tuple[str, ...] :param templates: Template specifications. :type templates: Tuple[TemplateSpec, ...] :param processes: Process-instantiation tuples. :type processes: Tuple[Tuple[str, str, Tuple[str, ...]], ...] :param system_process_names: Process names listed in the system line. :type system_process_names: Tuple[str, ...] :param queries: Query specifications. :type queries: Tuple[QuerySpec, ...] Example:: >>> from pyudbm.binding.utap_builder import LocationSpec, ModelSpec, TemplateSpec >>> spec = ModelSpec( ... declarations=("clock x;",), ... templates=(TemplateSpec("P", locations=(LocationSpec("Init", initial=True),)),), ... processes=(("P1", "P", ()),), ... system_process_names=("P1",), ... ) >>> spec.system_process_names ('P1',) """ declarations: Tuple[str, ...] = () templates: Tuple[TemplateSpec, ...] = () processes: Tuple[Tuple[str, str, Tuple[str, ...]], ...] = () system_process_names: Tuple[str, ...] = () queries: Tuple[QuerySpec, ...] = ()
@dataclass class _LocationState: name: str invariant: str urgent: bool committed: bool initial: bool @dataclass class _EdgeState: source: str target: str guard: str sync: str update: str @dataclass class _TemplateState: name: str parameters: str declaration_lines: List[str] = field(default_factory=list) locations: List[_LocationState] = field(default_factory=list) edges: List[_EdgeState] = field(default_factory=list) initial_location: Optional[str] = None @dataclass class _ProcessState: name: str template_name: str arguments: Tuple[str, ...] def _require_text(field_name: str, value: str) -> str: if not isinstance(value, str): raise TypeError("{0} must be a string".format(field_name)) text = value.strip() if not text: raise ValueError("{0} must not be empty".format(field_name)) return text def _normalize_optional_text(field_name: str, value: str) -> str: if not isinstance(value, str): raise TypeError("{0} must be a string".format(field_name)) return value.strip() def _stringify_expression(field_name: str, value: object) -> str: if isinstance(value, str): text = value.strip() else: text = str(value).strip() if not text: raise ValueError("{0} must not be empty".format(field_name)) return text def _normalize_names(kind: str, names: Sequence[str]) -> Tuple[str, ...]: if not names: raise ValueError("{0} requires at least one name".format(kind)) normalized = tuple(_require_text("{0} name".format(kind), item) for item in names) if len(set(normalized)) != len(normalized): raise ValueError("{0} names must be unique within one call".format(kind)) return normalized def _normalize_options(options: Iterable[Union[Option, Tuple[str, str]]]) -> Tuple[Option, ...]: result = [] for item in options: if isinstance(item, Option): result.append(item) else: try: name, value = item except (TypeError, ValueError): raise TypeError("query options must be Option instances or (name, value) pairs") result.append(Option(_require_text("option name", name), _require_text("option value", value))) return tuple(result) def _normalize_expectation(expectation: Optional[Expectation]) -> Expectation: if expectation is None: return Expectation("Symbolic", "True", "", ()) if not isinstance(expectation, Expectation): raise TypeError("expectation must be an Expectation instance or None") return expectation def _normalize_reset(reset: object) -> Tuple[Tuple[str, str], ...]: if reset in (None, ()): return () if isinstance(reset, MappingABC): items = reset.items() elif isinstance(reset, str): raise TypeError("reset must be a mapping or an iterable of (name, value) pairs") else: items = reset result = [] for item in items: try: name, value = item except (TypeError, ValueError): raise TypeError("reset must contain (name, value) pairs") result.append((_require_text("reset name", name), _stringify_expression("reset value", value))) return tuple(result) def _render_reset(reset: object) -> str: entries = _normalize_reset(reset) if not entries: return "" return ", ".join("{0} = {1}".format(name, value) for name, value in entries) def _normalize_guard_text(guard: str, when: str) -> str: normalized_guard = _normalize_optional_text("guard", guard) normalized_when = _normalize_optional_text("when", when) if normalized_guard and normalized_when and normalized_guard != normalized_when: raise ValueError("edge guard is ambiguous: use either 'guard' or 'when'") return normalized_guard or normalized_when def _normalize_sync_text(sync: str, send: str, recv: str) -> str: normalized_sync = _normalize_optional_text("sync", sync) normalized_send = _normalize_optional_text("send", send) normalized_recv = _normalize_optional_text("recv", recv) if normalized_send and normalized_recv: raise ValueError("edge sync is ambiguous: use only one of 'send' or 'recv'") derived_sync = "" if normalized_send: derived_sync = "{0}!".format(normalized_send) elif normalized_recv: derived_sync = "{0}?".format(normalized_recv) if normalized_sync and derived_sync and normalized_sync != derived_sync: raise ValueError("edge sync is ambiguous: use either 'sync' or 'send'/'recv'") return normalized_sync or derived_sync def _normalize_update_text(update: str, reset: object) -> str: normalized_update = _normalize_optional_text("update", update) rendered_reset = _render_reset(reset) if normalized_update and rendered_reset and normalized_update != rendered_reset: raise ValueError("edge update is ambiguous: use either 'update' or 'reset'") return normalized_update or rendered_reset def _normalize_process_arguments(arguments: Iterable[object]) -> Tuple[str, ...]: if isinstance(arguments, (str, bytes)): raise TypeError("process arguments must be an iterable of argument expressions, not a string") try: iterator = iter(arguments) except TypeError: raise TypeError("process arguments must be an iterable of argument expressions") return tuple(_stringify_expression("process argument", item) for item in iterator) def _normalize_declaration_texts(texts: Sequence[str]) -> Tuple[str, ...]: return tuple(_require_text("text", item) for item in texts) def _resolve_index(kind: str, size: int, index: int) -> int: if not isinstance(index, int): raise TypeError("{0} index must be an integer".format(kind)) resolved = index + size if index < 0 else index if resolved < 0 or resolved >= size: raise IndexError("{0} index {1} is out of range".format(kind, index)) return resolved def _normalize_selector( kind: str, where: object, normalizers: MappingABC, ) -> dict: if not isinstance(where, MappingABC): raise TypeError("{0} selector must be a mapping".format(kind)) if not where: raise ValueError("{0} selector must not be empty".format(kind)) normalized = {} for key, value in where.items(): if not isinstance(key, str): raise TypeError("{0} selector field name must be a string".format(kind)) if key not in normalizers: raise ValueError( "{0} selector field {1!r} is not supported".format(kind, key) ) normalized[key] = normalizers[key]("{0} selector {1}".format(kind, key), value) return normalized def _resolve_selector_index( kind: str, *, index: object, where: object, size: int, matcher, formatter, ) -> int: has_index = index is not _UNSET has_where = where is not _UNSET if has_index and has_where: raise ValueError("{0} selection is ambiguous: use either 'index' or 'where'".format(kind)) if not has_index and not has_where: raise ValueError("{0} selection requires either 'index' or 'where'".format(kind)) if has_index: return _resolve_index(kind, size, index) matched = [current_index for current_index in range(size) if matcher(current_index)] if not matched: raise ValueError("{0} selector {1!r} matched no {0}".format(kind, where)) if len(matched) > 1: plural_kind = "queries" if kind == "query" else "{0}s".format(kind) details = "; ".join(formatter(current_index) for current_index in matched) raise ValueError( "{0} selector {1!r} is ambiguous: matched {2} {3}: {4}".format( kind, where, len(matched), plural_kind, details ) ) return matched[0] def _option_snapshot(option: Option) -> dict: return { "name": option.name, "value": option.value, } def _expectation_snapshot(expectation: Expectation) -> dict: return { "value_type": expectation.value_type, "status": expectation.status, "value": expectation.value, "resources": tuple( { "name": resource.name, "value": resource.value, "unit": resource.unit, } for resource in expectation.resources ), } def _query_snapshot(index: int, query: Query) -> dict: return { "index": index, "formula": query.formula, "comment": query.comment, "options": tuple(_option_snapshot(option) for option in query.options), "expectation": _expectation_snapshot(query.expectation), "location": query.location, } def _location_snapshot(index: int, location: _LocationState) -> dict: return { "index": index, "name": location.name, "initial": location.initial, "invariant": location.invariant, "urgent": location.urgent, "committed": location.committed, } def _edge_snapshot(index: int, edge: _EdgeState) -> dict: return { "index": index, "source": edge.source, "target": edge.target, "guard": edge.guard, "sync": edge.sync, "update": edge.update, } def _query_match(query: Query, selector: MappingABC) -> bool: for key, value in selector.items(): if getattr(query, key) != value: return False return True def _edge_match(edge: _EdgeState, selector: MappingABC) -> bool: for key, value in selector.items(): if getattr(edge, key) != value: return False return True def _normalize_process_entry(process: Tuple[str, ...]) -> Tuple[str, str, Tuple[str, ...]]: if not isinstance(process, tuple): raise TypeError("process specs must be tuples") if len(process) == 2: name, template_name = process arguments = () elif len(process) == 3: name, template_name, arguments = process else: raise ValueError("process specs must have the shape (name, template_name) or (name, template_name, arguments)") return ( _require_text("process name", name), _require_text("template name", template_name), _normalize_process_arguments(arguments), ) def _normalize_location_spec(location: LocationSpec) -> LocationSpec: if not isinstance(location, LocationSpec): raise TypeError("template locations must be LocationSpec instances") return LocationSpec( name=_require_text("location name", location.name), initial=bool(location.initial), invariant=_normalize_optional_text("invariant", location.invariant), urgent=bool(location.urgent), committed=bool(location.committed), ) def _normalize_edge_spec(edge: EdgeSpec) -> EdgeSpec: if not isinstance(edge, EdgeSpec): raise TypeError("template edges must be EdgeSpec instances") return EdgeSpec( source=_require_text("source", edge.source), target=_require_text("target", edge.target), guard=_normalize_optional_text("guard", edge.guard), sync=_normalize_optional_text("sync", edge.sync), update=_normalize_optional_text("update", edge.update), ) def _normalize_query_spec(query: QuerySpec) -> QuerySpec: if not isinstance(query, QuerySpec): raise TypeError("model queries must be QuerySpec instances") return QuerySpec( formula=_require_text("formula", query.formula), comment=_normalize_optional_text("comment", query.comment), options=_normalize_options(query.options), expectation=_normalize_expectation(query.expectation), location=_normalize_optional_text("location", query.location), ) def _normalize_template_spec(template: TemplateSpec) -> TemplateSpec: if not isinstance(template, TemplateSpec): raise TypeError("model templates must be TemplateSpec instances") return TemplateSpec( name=_require_text("template name", template.name), parameters=_normalize_optional_text("parameters", template.parameters), declarations=tuple(_require_text("declaration", item) for item in template.declarations), locations=tuple(_normalize_location_spec(item) for item in template.locations), edges=tuple(_normalize_edge_spec(item) for item in template.edges), ) def _normalize_model_spec(spec: ModelSpec) -> ModelSpec: if not isinstance(spec, ModelSpec): raise TypeError("spec must be a ModelSpec instance") if spec.system_process_names: system_process_names = _normalize_names("system process", spec.system_process_names) else: system_process_names = () return ModelSpec( declarations=tuple(_require_text("declaration", item) for item in spec.declarations), templates=tuple(_normalize_template_spec(item) for item in spec.templates), processes=tuple(_normalize_process_entry(item) for item in spec.processes), system_process_names=system_process_names, queries=tuple(_normalize_query_spec(item) for item in spec.queries), ) def _builder_from_model_spec(spec: ModelSpec) -> "ModelBuilder": normalized_spec = _normalize_model_spec(spec) builder = ModelBuilder() builder._declaration_lines = list(normalized_spec.declarations) builder._templates = [ _TemplateState( name=template.name, parameters=template.parameters, declaration_lines=list(template.declarations), locations=[ _LocationState( name=location.name, invariant=location.invariant, urgent=location.urgent, committed=location.committed, initial=location.initial, ) for location in template.locations ], edges=[ _EdgeState( source=edge.source, target=edge.target, guard=edge.guard, sync=edge.sync, update=edge.update, ) for edge in template.edges ], initial_location=next(location.name for location in template.locations if location.initial), ) for template in normalized_spec.templates ] builder._processes = [ _ProcessState( name=process_name, template_name=template_name, arguments=arguments, ) for process_name, template_name, arguments in normalized_spec.processes ] builder._system_process_names = tuple(normalized_spec.system_process_names) builder._queries = [ Query( formula=query.formula, comment=query.comment, options=query.options, expectation=query.expectation, location=query.location, ) for query in normalized_spec.queries ] return builder def _validate_model_spec(spec: ModelSpec) -> None: if not spec.system_process_names: raise ValueError("system_process_names must not be empty") template_names = set() for template in spec.templates: if template.name in template_names: raise ValueError("duplicate template {0!r}".format(template.name)) template_names.add(template.name) location_names = set() initial_locations = [] for location in template.locations: if location.name in location_names: raise ValueError("template {0!r} has duplicate location {1!r}".format(template.name, location.name)) location_names.add(location.name) if location.initial: initial_locations.append(location.name) if not initial_locations: raise ValueError("template {0!r} must define one initial location".format(template.name)) if len(initial_locations) > 1: raise ValueError("template {0!r} has multiple initial locations".format(template.name)) for edge in template.edges: if edge.source not in location_names: raise ValueError("template {0!r} edge source {1!r} does not exist".format(template.name, edge.source)) if edge.target not in location_names: raise ValueError("template {0!r} edge target {1!r} does not exist".format(template.name, edge.target)) process_names = set() for process_name, template_name, _ in spec.processes: if process_name in process_names: raise ValueError("duplicate process {0!r}".format(process_name)) process_names.add(process_name) if template_name not in template_names: raise ValueError("process {0!r} references unknown template {1!r}".format(process_name, template_name)) for process_name in spec.system_process_names: if process_name not in process_names: raise ValueError("system references unknown process {0!r}".format(process_name)) def _payload_from_model_spec(spec: ModelSpec) -> dict: return { "declarations": tuple(spec.declarations), "templates": tuple( { "name": template.name, "parameters": template.parameters, "declarations": tuple(template.declarations), "locations": tuple( { "name": location.name, "invariant": location.invariant, "urgent": location.urgent, "committed": location.committed, } for location in template.locations ), "edges": tuple( { "source": edge.source, "target": edge.target, "guard": edge.guard, "sync": edge.sync, "update": edge.update, } for edge in template.edges ), "initial_location": next(location.name for location in template.locations if location.initial), } for template in spec.templates ), "processes": tuple( { "name": process_name, "template_name": template_name, "arguments": tuple(arguments), } for process_name, template_name, arguments in spec.processes ), "system_process_names": tuple(spec.system_process_names), "queries": tuple( { "formula": query.formula, "comment": query.comment, "options": tuple({"name": option.name, "value": option.value} for option in query.options), "expectation": { "value_type": query.expectation.value_type, "status": query.expectation.status, "value": query.expectation.value, "resources": tuple( { "name": resource.name, "value": resource.value, "unit": resource.unit, } for resource in query.expectation.resources ), }, "location": query.location, } for query in spec.queries ), } def _build_document_from_spec(spec: ModelSpec) -> ModelDocument: normalized_spec = _normalize_model_spec(spec) _validate_model_spec(normalized_spec) return ModelDocument(_native_build_model(_payload_from_model_spec(normalized_spec))) def _split_expression_list(text: str) -> Tuple[str, ...]: stripped = text.strip() if not stripped: return () items = [] current = [] paren_depth = 0 bracket_depth = 0 brace_depth = 0 quote = None escaped = False for character in stripped: if quote is not None: current.append(character) if escaped: escaped = False elif character == "\\": escaped = True elif character == quote: quote = None continue if character in ("'", '"'): quote = character current.append(character) continue if character == "(": paren_depth += 1 elif character == ")": paren_depth -= 1 elif character == "[": bracket_depth += 1 elif character == "]": bracket_depth -= 1 elif character == "{": brace_depth += 1 elif character == "}": brace_depth -= 1 elif character == "," and paren_depth == 0 and bracket_depth == 0 and brace_depth == 0: part = "".join(current).strip() if not part: raise ValueError("process argument text must not contain empty entries") items.append(part) current = [] continue current.append(character) if quote is not None or paren_depth != 0 or bracket_depth != 0 or brace_depth != 0: raise ValueError("process argument text is not balanced") tail = "".join(current).strip() if not tail: raise ValueError("process argument text must not contain empty entries") items.append(tail) return tuple(items) def _read_xml_text(element: Optional[ET.Element]) -> str: return "" if element is None or element.text is None else element.text.strip() def _find_label_text(parent: ET.Element, kind: str) -> str: for label in parent.findall("label"): if label.attrib.get("kind") == kind: return _read_xml_text(label) return "" def _model_spec_from_document(document: ModelDocument) -> ModelSpec: if not isinstance(document, ModelDocument): raise TypeError("document must be a ModelDocument instance") if document.options: raise ValueError("from_document() does not support document-level options") if document.before_update_text.strip(): raise ValueError("from_document() does not support before_update declarations") if document.after_update_text.strip(): raise ValueError("from_document() does not support after_update declarations") if document.channel_priority_texts: raise ValueError("from_document() does not support channel priority declarations") root = ET.fromstring(document.dumps()) declarations = () global_declaration_text = _read_xml_text(root.find("declaration")) if global_declaration_text: declarations = (global_declaration_text,) templates = [] for template_element in root.findall("template"): template_name = _require_text("template name", _read_xml_text(template_element.find("name"))) if template_element.findall("branchpoint"): raise ValueError("from_document() does not support branchpoints in template {0!r}".format(template_name)) location_name_by_id = {} initial_ref = _require_text("initial location ref", template_element.find("init").attrib.get("ref", "")) for location_element in template_element.findall("location"): location_id = _require_text("location id", location_element.attrib.get("id", "")) location_name_by_id[location_id] = _require_text("location name", _read_xml_text(location_element.find("name"))) if initial_ref not in location_name_by_id: raise ValueError("from_document() could not resolve initial location in template {0!r}".format(template_name)) locations = [] for location_element in template_element.findall("location"): location_name = location_name_by_id[_require_text("location id", location_element.attrib.get("id", ""))] locations.append( LocationSpec( name=location_name, initial=(location_name == location_name_by_id[initial_ref]), invariant=_find_label_text(location_element, "invariant"), urgent=location_element.find("urgent") is not None, committed=location_element.find("committed") is not None, ) ) edges = [] for transition_element in template_element.findall("transition"): source_element = transition_element.find("source") target_element = transition_element.find("target") if source_element is None or target_element is None: raise ValueError("from_document() requires transitions with both source and target references") source_ref = _require_text("source ref", source_element.attrib.get("ref", "")) target_ref = _require_text("target ref", target_element.attrib.get("ref", "")) if source_ref not in location_name_by_id or target_ref not in location_name_by_id: raise ValueError( "from_document() does not support branchpoint transitions in template {0!r}".format(template_name) ) select_text = _find_label_text(transition_element, "select") if select_text: raise ValueError("from_document() does not support edge select clauses in template {0!r}".format(template_name)) probability_text = _find_label_text(transition_element, "probability") if probability_text: raise ValueError( "from_document() does not support edge probability labels in template {0!r}".format(template_name) ) comment_text = _find_label_text(transition_element, "comments") if comment_text: raise ValueError("from_document() does not support edge comments in template {0!r}".format(template_name)) edges.append( EdgeSpec( source=location_name_by_id[source_ref], target=location_name_by_id[target_ref], guard=_find_label_text(transition_element, "guard"), sync=_find_label_text(transition_element, "synchronisation"), update=_find_label_text(transition_element, "assignment"), ) ) declaration_text = _read_xml_text(template_element.find("declaration")) templates.append( TemplateSpec( name=template_name, parameters=_read_xml_text(template_element.find("parameter")), declarations=(declaration_text,) if declaration_text else (), locations=tuple(locations), edges=tuple(edges), ) ) queries = [] default_expectation = Expectation("Symbolic", "True", "", ()) for query in document.queries: if not query.formula.strip(): if query.comment.strip() or query.options or query.expectation != default_expectation: raise ValueError("from_document() does not support empty imported query formulas with metadata") else: queries.append( QuerySpec( formula=query.formula, comment=query.comment, options=query.options, expectation=query.expectation, location=query.location, ) ) return ModelSpec( declarations=declarations, templates=tuple(templates), processes=tuple( (process.name, process.template_name, _split_expression_list(process.arguments)) for process in document.processes ), system_process_names=tuple(process.name for process in document.processes), queries=tuple(queries), )
[docs] def build_model(spec: ModelSpec) -> ModelDocument: """ Build one public :class:`ModelDocument` from a :class:`ModelSpec`. :param spec: Immutable model specification. :type spec: ModelSpec :return: Parsed public model document. :rtype: ModelDocument Example:: >>> from pyudbm.binding.utap_builder import LocationSpec, ModelSpec, TemplateSpec, build_model >>> document = build_model( ... ModelSpec( ... declarations=("clock x;",), ... templates=(TemplateSpec("P", locations=(LocationSpec("Init", initial=True),)),), ... processes=(("P1", "P", ()),), ... system_process_names=("P1",), ... ) ... ) >>> document.templates[0].name 'P' """ return _build_document_from_spec(spec)
[docs] class ModelBuilder: """ Build one UTAP model from lightweight Python authoring calls. The builder focuses on the common small-model path: global declarations, templates, processes, system composition, and queries. Calling :meth:`build` returns a public :class:`ModelDocument`. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> document = ( ... ModelBuilder() ... .clock("x") ... .template("P") ... .location("Init", initial=True) ... .end() ... .process("P1", "P") ... .system("P1") ... .build() ... ) >>> document.templates[0].name 'P' """
[docs] def __init__(self): """ Initialize one empty model builder. :return: ``None``. :rtype: None Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> isinstance(ModelBuilder(), ModelBuilder) True """ self._declaration_lines = [] self._templates = [] self._processes = [] self._system_process_names = None self._queries = []
[docs] @classmethod def from_document(cls, document: ModelDocument) -> "ModelBuilder": """ Rebuild one mutable builder from a public :class:`ModelDocument`. This import path is limited to the current builder surface. Template branchpoints, document-level options, and other unsupported features are rejected with clear errors instead of being silently discarded. :param document: Imported public document snapshot. :type document: ModelDocument :return: Builder initialized from the imported document. :rtype: ModelBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> document = ( ... ModelBuilder() ... .clock("x") ... .template("P") ... .location("Init", initial=True) ... .end() ... .process("P1", "P") ... .system("P1") ... .build() ... ) >>> imported = ModelBuilder.from_document(document) >>> isinstance(imported, ModelBuilder) True """ return _builder_from_model_spec(_model_spec_from_document(document))
[docs] def declaration(self, text: str) -> "ModelBuilder": """ Append one raw global declaration block. :param text: Declaration text appended to the global declaration section. :type text: str :return: The current builder. :rtype: ModelBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().declaration("const int N = 1;") >>> isinstance(builder, ModelBuilder) True """ self._declaration_lines.append(_require_text("text", text)) return self
[docs] def set_declarations(self, *texts: str) -> "ModelBuilder": """ Replace all global declaration blocks. Passing no texts clears the current global declaration list. :param texts: Replacement declaration blocks. :type texts: str :return: The current builder. :rtype: ModelBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().clock("x").set_declarations("clock y;") >>> isinstance(builder, ModelBuilder) True """ self._declaration_lines = list(_normalize_declaration_texts(texts)) return self
[docs] def clock(self, *names: str) -> "ModelBuilder": """ Append one ``clock`` declaration for one or more names. :param names: Clock names declared in one statement. :type names: str :return: The current builder. :rtype: ModelBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().clock("x", "y") >>> isinstance(builder, ModelBuilder) True """ normalized = _normalize_names("clock", names) self._declaration_lines.append("clock {0};".format(", ".join(normalized))) return self
[docs] def chan(self, *names: str, broadcast: bool = False, urgent: bool = False) -> "ModelBuilder": """ Append one channel declaration for one or more names. :param names: Channel names declared in one statement. :type names: str :param broadcast: Whether the declaration is for broadcast channels. :type broadcast: bool :param urgent: Whether the declaration is for urgent channels. :type urgent: bool :return: The current builder. :rtype: ModelBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().chan("tick", urgent=True) >>> isinstance(builder, ModelBuilder) True """ normalized = _normalize_names("channel", names) prefix_parts = [] if urgent: prefix_parts.append("urgent") if broadcast: prefix_parts.append("broadcast") prefix_parts.append("chan") self._declaration_lines.append("{0} {1};".format(" ".join(prefix_parts), ", ".join(normalized))) return self
[docs] def integer( self, name: str, *, lower: Optional[object] = None, upper: Optional[object] = None, init: Optional[object] = None, const: bool = False ) -> "ModelBuilder": """ Append one integer declaration. When both ``lower`` and ``upper`` are provided, the declaration uses the bounded form ``int[min,max] name``. When ``const=True``, an initializer is required. :param name: Integer variable name. :type name: str :param lower: Optional lower bound. :type lower: object or None :param upper: Optional upper bound. :type upper: object or None :param init: Optional initializer expression. :type init: object or None :param const: Whether to declare a constant integer. :type const: bool :return: The current builder. :rtype: ModelBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().integer("count", lower=0, upper=3, init=1) >>> isinstance(builder, ModelBuilder) True """ normalized_name = _require_text("integer name", name) if (lower is None) != (upper is None): raise ValueError("integer bounds require both 'lower' and 'upper'") if const and init is None: raise ValueError("const integer {0!r} requires an initializer".format(normalized_name)) prefix = "const " if const else "" if lower is None: type_text = "int" else: type_text = "int[{0},{1}]".format( _stringify_expression("lower", lower), _stringify_expression("upper", upper), ) declaration = "{0}{1} {2}".format(prefix, type_text, normalized_name) if init is not None: declaration = "{0} = {1}".format(declaration, _stringify_expression("init", init)) self._declaration_lines.append(declaration + ";") return self
[docs] def template(self, name: str, *, parameters: str = "", declaration: str = "") -> "TemplateBuilder": """ Start one template builder owned by this model. :param name: Template name. :type name: str :param parameters: Optional raw template parameter text. :type parameters: str :param declaration: Optional raw local declaration text. :type declaration: str :return: The template builder. :rtype: TemplateBuilder :raises ValueError: If the template name already exists. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder, TemplateBuilder >>> template = ModelBuilder().template("P") >>> isinstance(template, TemplateBuilder) True """ normalized_name = _require_text("template name", name) if any(item.name == normalized_name for item in self._templates): raise ValueError("duplicate template {0!r}".format(normalized_name)) state = _TemplateState(name=normalized_name, parameters=_normalize_optional_text("parameters", parameters)) if declaration.strip(): state.declaration_lines.append(_require_text("declaration", declaration)) self._templates.append(state) return TemplateBuilder(self, state)
[docs] def update_template( self, name: str, *, new_name: object = _UNSET, parameters: object = _UNSET, ) -> "ModelBuilder": """ Update one existing template header. :param name: Existing template name. :type name: str :param new_name: Optional replacement template name. :type new_name: object :param parameters: Optional replacement parameter text. :type parameters: object :return: The current builder. :rtype: ModelBuilder :raises ValueError: If the template does not exist or the new name is duplicated. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .end() ... .update_template("P", new_name="Worker") ... ) >>> isinstance(builder, ModelBuilder) True """ normalized_name = _require_text("template name", name) for state in self._templates: if state.name == normalized_name: template_state = state break else: raise ValueError("unknown template {0!r}".format(normalized_name)) final_name = template_state.name if new_name is not _UNSET: final_name = _require_text("new template name", new_name) if final_name != template_state.name and any(item.name == final_name for item in self._templates): raise ValueError("duplicate template {0!r}".format(final_name)) if parameters is not _UNSET: template_state.parameters = _normalize_optional_text("parameters", parameters) if final_name != template_state.name: old_name = template_state.name template_state.name = final_name for process_state in self._processes: if process_state.template_name == old_name: process_state.template_name = final_name return self
[docs] def edit_template(self, name: str) -> "TemplateBuilder": """ Open one existing template for incremental edits. This is intended for import-edit-export workflows started with :meth:`from_document`. :param name: Existing template name. :type name: str :return: Template builder bound to the existing template state. :rtype: TemplateBuilder :raises ValueError: If the template does not exist. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> document = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .end() ... .process("P1", "P") ... .system("P1") ... .build() ... ) >>> template = ModelBuilder.from_document(document).edit_template("P") >>> isinstance(template, TemplateBuilder) True """ normalized_name = _require_text("template name", name) for state in self._templates: if state.name == normalized_name: return TemplateBuilder(self, state) raise ValueError("unknown template {0!r}".format(normalized_name))
[docs] def remove_template(self, name: str) -> "ModelBuilder": """ Remove one template and all processes instantiated from it. Any matching process names are also removed from the current system declaration order. :param name: Template name to remove. :type name: str :return: The current builder. :rtype: ModelBuilder :raises ValueError: If the template does not exist. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .end() ... .process("P1", "P") ... .system("P1") ... .remove_template("P") ... ) >>> isinstance(builder, ModelBuilder) True """ normalized_name = _require_text("template name", name) for index, state in enumerate(self._templates): if state.name == normalized_name: del self._templates[index] break else: raise ValueError("unknown template {0!r}".format(normalized_name)) removed_process_names = {process.name for process in self._processes if process.template_name == normalized_name} self._processes = [process for process in self._processes if process.template_name != normalized_name] if self._system_process_names is not None and removed_process_names: self._system_process_names = tuple( process_name for process_name in self._system_process_names if process_name not in removed_process_names ) return self
[docs] def process(self, name: str, template: str, *arguments: object) -> "ModelBuilder": """ Append one process instantiation line. :param name: Process instance name. :type name: str :param template: Template name referenced by the process. :type template: str :param arguments: Optional raw argument texts rendered with :func:`str`. :type arguments: object :return: The current builder. :rtype: ModelBuilder :raises ValueError: If the process name already exists. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .end() ... .process("P1", "P") ... ) >>> isinstance(builder, ModelBuilder) True """ normalized_name = _require_text("process name", name) normalized_template = _require_text("template name", template) if any(item.name == normalized_name for item in self._processes): raise ValueError("duplicate process {0!r}".format(normalized_name)) self._processes.append( _ProcessState( name=normalized_name, template_name=normalized_template, arguments=_normalize_process_arguments(arguments), ) ) return self
[docs] def update_process( self, name: str, *, new_name: object = _UNSET, template: object = _UNSET, arguments: object = _UNSET, ) -> "ModelBuilder": """ Update one existing process instantiation. :param name: Existing process name. :type name: str :param new_name: Optional replacement process name. :type new_name: object :param template: Optional replacement template name. :type template: object :param arguments: Optional replacement iterable of argument texts. :type arguments: object :return: The current builder. :rtype: ModelBuilder :raises ValueError: If the process does not exist or the new name is duplicated. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .end() ... .process("P1", "P") ... .update_process("P1", new_name="Main") ... ) >>> isinstance(builder, ModelBuilder) True """ normalized_name = _require_text("process name", name) for state in self._processes: if state.name == normalized_name: process_state = state break else: raise ValueError("unknown process {0!r}".format(normalized_name)) final_name = process_state.name if new_name is not _UNSET: final_name = _require_text("new process name", new_name) if final_name != process_state.name and any(item.name == final_name for item in self._processes): raise ValueError("duplicate process {0!r}".format(final_name)) if template is not _UNSET: process_state.template_name = _require_text("template name", template) if arguments is not _UNSET: process_state.arguments = _normalize_process_arguments(arguments) if final_name != process_state.name: old_name = process_state.name process_state.name = final_name if self._system_process_names is not None: self._system_process_names = tuple( final_name if process_name == old_name else process_name for process_name in self._system_process_names ) return self
[docs] def remove_process(self, name: str) -> "ModelBuilder": """ Remove one process instantiation. The process name is also removed from the current system order if present. :param name: Process name to remove. :type name: str :return: The current builder. :rtype: ModelBuilder :raises ValueError: If the process does not exist. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .end() ... .process("P1", "P") ... .system("P1") ... .remove_process("P1") ... ) >>> isinstance(builder, ModelBuilder) True """ normalized_name = _require_text("process name", name) for index, state in enumerate(self._processes): if state.name == normalized_name: del self._processes[index] break else: raise ValueError("unknown process {0!r}".format(normalized_name)) if self._system_process_names is not None: self._system_process_names = tuple( process_name for process_name in self._system_process_names if process_name != normalized_name ) return self
[docs] def system(self, *process_names: str) -> "ModelBuilder": """ Set the process order used by the system declaration. :param process_names: Process instance names listed in the system declaration. :type process_names: str :return: The current builder. :rtype: ModelBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().system("P1") >>> isinstance(builder, ModelBuilder) True """ self._system_process_names = _normalize_names("system process", process_names) return self
[docs] def list_templates(self) -> Tuple[dict, ...]: """ List current template entries with explicit indexes. :return: Template snapshots. :rtype: Tuple[dict, ...] Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().template("P").location("Init", initial=True).end() >>> builder.list_templates()[0]["name"] 'P' """ return tuple( { "index": index, "name": template.name, "parameters": template.parameters, "declarations": tuple(template.declaration_lines), "initial_location": template.initial_location, "location_count": len(template.locations), "edge_count": len(template.edges), } for index, template in enumerate(self._templates) )
[docs] def list_processes(self) -> Tuple[dict, ...]: """ List current process entries with explicit indexes. :return: Process snapshots. :rtype: Tuple[dict, ...] Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().template("P").location("Init", initial=True).end().process("P1", "P") >>> builder.list_processes()[0]["name"] 'P1' """ system_indexes = {name: index for index, name in enumerate(self._system_process_names or ())} return tuple( { "index": index, "name": process.name, "template_name": process.template_name, "arguments": tuple(process.arguments), "in_system": process.name in system_indexes, "system_index": system_indexes.get(process.name), } for index, process in enumerate(self._processes) )
[docs] def list_queries(self) -> Tuple[dict, ...]: """ List current query entries with explicit indexes. :return: Query snapshots. :rtype: Tuple[dict, ...] Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().query("A[] not deadlock") >>> builder.list_queries()[0]["formula"] 'A[] not deadlock' """ return tuple(_query_snapshot(index, query) for index, query in enumerate(self._queries))
[docs] def list_locations(self, template: str) -> Tuple[dict, ...]: """ List locations of one template with explicit indexes. :param template: Template name. :type template: str :return: Location snapshots. :rtype: Tuple[dict, ...] Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().template("P").location("Init", initial=True).end() >>> builder.list_locations("P")[0]["name"] 'Init' """ normalized_name = _require_text("template name", template) for state in self._templates: if state.name == normalized_name: return tuple(_location_snapshot(index, location) for index, location in enumerate(state.locations)) raise ValueError("unknown template {0!r}".format(normalized_name))
[docs] def list_edges(self, template: str) -> Tuple[dict, ...]: """ List edges of one template with explicit indexes. :param template: Template name. :type template: str :return: Edge snapshots. :rtype: Tuple[dict, ...] Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().template("P").location("Init", initial=True).location("Done").edge("Init", "Done").end() >>> builder.list_edges("P")[0]["source"] 'Init' """ normalized_name = _require_text("template name", template) for state in self._templates: if state.name == normalized_name: return tuple(_edge_snapshot(index, edge) for index, edge in enumerate(state.edges)) raise ValueError("unknown template {0!r}".format(normalized_name))
[docs] def inspect(self) -> dict: """ Return one full JSON-friendly snapshot of current builder state. :return: Full builder snapshot. :rtype: dict Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> snapshot = ModelBuilder().template("P").location("Init", initial=True).end().inspect() >>> snapshot["templates"][0]["name"] 'P' """ return { "declarations": tuple(self._declaration_lines), "templates": tuple( { "index": index, "name": template.name, "parameters": template.parameters, "declarations": tuple(template.declaration_lines), "initial_location": template.initial_location, "locations": tuple( _location_snapshot(location_index, location) for location_index, location in enumerate(template.locations) ), "edges": tuple( _edge_snapshot(edge_index, edge) for edge_index, edge in enumerate(template.edges) ), } for index, template in enumerate(self._templates) ), "processes": self.list_processes(), "system_process_names": tuple(self._system_process_names or ()), "queries": self.list_queries(), }
[docs] def query( self, formula: str, *, comment: str = "", options: Iterable[Union[Option, Tuple[str, str]]] = (), expectation: Optional[Expectation] = None, location: str = "", ) -> "ModelBuilder": """ Append one model query. :param formula: Query formula text. :type formula: str :param comment: Optional query comment text. :type comment: str :param options: Optional query-local options. :type options: Iterable[Option or Tuple[str, str]] :param expectation: Optional structured expectation metadata. :type expectation: Expectation or None :param location: Optional query location text. :type location: str :return: The current builder. :rtype: ModelBuilder Example:: >>> from pyudbm.binding import Expectation >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().query("A[] not deadlock", expectation=Expectation("", "", "", ())) >>> isinstance(builder, ModelBuilder) True """ self._queries.append( Query( formula=_require_text("formula", formula), comment=_normalize_optional_text("comment", comment), options=_normalize_options(options), expectation=_normalize_expectation(expectation), location=_normalize_optional_text("location", location), ) ) return self
[docs] def update_query( self, index: object = _UNSET, *, where: object = _UNSET, formula: object = _UNSET, comment: object = _UNSET, options: object = _UNSET, expectation: object = _UNSET, location: object = _UNSET, ) -> "ModelBuilder": """ Update one existing query by ``index`` or ``where`` selector. Negative indexes follow normal Python indexing rules when ``index`` is used. :param index: Optional query index. :type index: object :param where: Optional query selector mapping. :type where: object :param formula: Optional replacement formula text. :type formula: object :param comment: Optional replacement comment text. :type comment: object :param options: Optional replacement query options. :type options: object :param expectation: Optional replacement expectation metadata. :type expectation: object :param location: Optional replacement query location text. :type location: object :return: The current builder. :rtype: ModelBuilder .. note:: When you choose the ``index`` path, get stable query indexes first via :meth:`list_queries` or from the ``queries`` section returned by :meth:`inspect`. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().query("A[] not deadlock").update_query(where={"formula": "A[] not deadlock"}, comment="patched") >>> isinstance(builder, ModelBuilder) True """ selector = _UNSET if where is not _UNSET: selector = _normalize_selector( "query", where, { "formula": _require_text, "comment": _normalize_optional_text, "location": _normalize_optional_text, }, ) resolved = _resolve_selector_index( "query", index=index, where=selector, size=len(self._queries), matcher=lambda current_index: _query_match(self._queries[current_index], selector), formatter=lambda current_index: ( "index={0} formula={1!r} comment={2!r} location={3!r}".format( current_index, self._queries[current_index].formula, self._queries[current_index].comment, self._queries[current_index].location, ) ), ) current = self._queries[resolved] self._queries[resolved] = Query( formula=current.formula if formula is _UNSET else _require_text("formula", formula), comment=current.comment if comment is _UNSET else _normalize_optional_text("comment", comment), options=current.options if options is _UNSET else _normalize_options(options), expectation=current.expectation if expectation is _UNSET else _normalize_expectation(expectation), location=current.location if location is _UNSET else _normalize_optional_text("location", location), ) return self
[docs] def remove_query(self, index: object = _UNSET, *, where: object = _UNSET) -> "ModelBuilder": """ Remove one query by ``index`` or ``where`` selector. Negative indexes follow normal Python indexing rules when ``index`` is used. :param index: Optional query index. :type index: object :param where: Optional query selector mapping. :type where: object :return: The current builder. :rtype: ModelBuilder .. note:: For index-based deletion, retrieve current indexes from :meth:`list_queries` or from the ``queries`` section returned by :meth:`inspect`. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().query("A[] not deadlock").remove_query(where={"formula": "A[] not deadlock"}) >>> isinstance(builder, ModelBuilder) True """ selector = _UNSET if where is not _UNSET: selector = _normalize_selector( "query", where, { "formula": _require_text, "comment": _normalize_optional_text, "location": _normalize_optional_text, }, ) resolved = _resolve_selector_index( "query", index=index, where=selector, size=len(self._queries), matcher=lambda current_index: _query_match(self._queries[current_index], selector), formatter=lambda current_index: ( "index={0} formula={1!r} comment={2!r} location={3!r}".format( current_index, self._queries[current_index].formula, self._queries[current_index].comment, self._queries[current_index].location, ) ), ) del self._queries[resolved] return self
[docs] def to_spec(self) -> ModelSpec: """ Export the current builder state as one public :class:`ModelSpec`. :return: Immutable model specification snapshot. :rtype: ModelSpec Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder, ModelSpec >>> spec = ModelBuilder().clock("x").template("P").location("Init", initial=True).end().to_spec() >>> isinstance(spec, ModelSpec) True """ return ModelSpec( declarations=tuple(self._declaration_lines), templates=tuple( TemplateSpec( name=template.name, parameters=template.parameters, declarations=tuple(template.declaration_lines), locations=tuple( LocationSpec( name=location.name, initial=location.initial, invariant=location.invariant, urgent=location.urgent, committed=location.committed, ) for location in template.locations ), edges=tuple( EdgeSpec( source=edge.source, target=edge.target, guard=edge.guard, sync=edge.sync, update=edge.update, ) for edge in template.edges ), ) for template in self._templates ), processes=tuple((process.name, process.template_name, tuple(process.arguments)) for process in self._processes), system_process_names=tuple(self._system_process_names or ()), queries=tuple( QuerySpec( formula=query.formula, comment=query.comment, options=tuple(query.options), expectation=query.expectation, location=query.location, ) for query in self._queries ), )
[docs] def build(self) -> ModelDocument: """ Build the current model into one public :class:`ModelDocument`. :return: Parsed public model document. :rtype: ModelDocument Example:: >>> from pyudbm.binding import ModelBuilder, ModelDocument >>> document = ( ... ModelBuilder() ... .clock("x") ... .template("P") ... .location("Init", initial=True) ... .end() ... .process("P1", "P") ... .system("P1") ... .build() ... ) >>> isinstance(document, ModelDocument) True """ return _build_document_from_spec(self.to_spec())
[docs] class TemplateBuilder: """ Build one template inside a :class:`ModelBuilder`. The template builder is name-first and string-friendly. Locations and edges are accumulated on the owning model until :meth:`end` returns the parent builder again. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder, TemplateBuilder >>> template = ModelBuilder().template("P") >>> isinstance(template, TemplateBuilder) True """
[docs] def __init__(self, model_builder: ModelBuilder, state: _TemplateState): """ Initialize one template builder owned by ``model_builder``. :param model_builder: Owning model builder. :type model_builder: ModelBuilder :param state: Mutable internal template state. :type state: _TemplateState :return: ``None``. :rtype: None """ self._model_builder = model_builder self._state = state self._closed = False
[docs] def __enter__(self) -> "TemplateBuilder": """ Enter the template-builder context manager. :return: The current template builder. :rtype: TemplateBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> with ModelBuilder().template("P") as template: ... template.location("Init", initial=True) >>> type(template).__name__ 'TemplateBuilder' """ return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb) -> None: """ Leave the template-builder context manager. :param exc_type: Exception type, if any. :type exc_type: type or None :param exc_val: Exception value, if any. :type exc_val: BaseException or None :param exc_tb: Exception traceback, if any. :type exc_tb: object :return: ``None``. :rtype: None """ self._closed = True
[docs] def declaration(self, text: str) -> "TemplateBuilder": """ Append one raw local declaration block. :param text: Local declaration text. :type text: str :return: The current template builder. :rtype: TemplateBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> template = ModelBuilder().template("P").declaration("int i;") >>> type(template).__name__ 'TemplateBuilder' """ self._ensure_open() self._state.declaration_lines.append(_require_text("text", text)) return self
[docs] def set_declarations(self, *texts: str) -> "TemplateBuilder": """ Replace all local declaration blocks on the current template. Passing no texts clears the current local declaration list. :param texts: Replacement declaration blocks. :type texts: str :return: The current template builder. :rtype: TemplateBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> template = ModelBuilder().template("P").set_declarations("int i;") >>> type(template).__name__ 'TemplateBuilder' """ self._ensure_open() self._state.declaration_lines = list(_normalize_declaration_texts(texts)) return self
[docs] def location( self, name: str, *, initial: bool = False, invariant: str = "", urgent: bool = False, committed: bool = False ) -> "TemplateBuilder": """ Append one location to the current template. :param name: Location name. :type name: str :param initial: Whether this location is the initial location. :type initial: bool :param invariant: Optional invariant text. :type invariant: str :param urgent: Whether the location is urgent. :type urgent: bool :param committed: Whether the location is committed. :type committed: bool :return: The current template builder. :rtype: TemplateBuilder :raises ValueError: If the location name already exists or multiple initial locations are requested. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> template = ModelBuilder().template("P").location("Init", initial=True) >>> type(template).__name__ 'TemplateBuilder' """ self._ensure_open() normalized_name = _require_text("location name", name) if any(item.name == normalized_name for item in self._state.locations): raise ValueError( "template {0!r} has duplicate location {1!r}".format(self._state.name, normalized_name) ) if initial and self._state.initial_location is not None: raise ValueError("template {0!r} already has an initial location".format(self._state.name)) self._state.locations.append( _LocationState( name=normalized_name, invariant=_normalize_optional_text("invariant", invariant), urgent=urgent, committed=committed, initial=initial, ) ) if initial: self._state.initial_location = normalized_name return self
[docs] def update_location( self, name: str, *, new_name: object = _UNSET, initial: object = _UNSET, invariant: object = _UNSET, urgent: object = _UNSET, committed: object = _UNSET, ) -> "TemplateBuilder": """ Update one existing location by name. Renaming a location also updates all edges that refer to it. :param name: Existing location name. :type name: str :param new_name: Optional replacement location name. :type new_name: object :param initial: Optional replacement initial-location flag. :type initial: object :param invariant: Optional replacement invariant text. :type invariant: object :param urgent: Optional replacement urgent flag. :type urgent: object :param committed: Optional replacement committed flag. :type committed: object :return: The current template builder. :rtype: TemplateBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> template = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .update_location("Init", new_name="Start") ... ) >>> type(template).__name__ 'TemplateBuilder' """ self._ensure_open() normalized_name = _require_text("location name", name) for state in self._state.locations: if state.name == normalized_name: location_state = state break else: raise ValueError("template {0!r} has no location {1!r}".format(self._state.name, normalized_name)) final_name = location_state.name if new_name is not _UNSET: final_name = _require_text("new location name", new_name) if final_name != location_state.name and any(item.name == final_name for item in self._state.locations): raise ValueError("template {0!r} has duplicate location {1!r}".format(self._state.name, final_name)) if invariant is not _UNSET: location_state.invariant = _normalize_optional_text("invariant", invariant) if urgent is not _UNSET: location_state.urgent = bool(urgent) if committed is not _UNSET: location_state.committed = bool(committed) if initial is not _UNSET: is_initial = bool(initial) if is_initial: for item in self._state.locations: item.initial = False location_state.initial = True self._state.initial_location = final_name else: location_state.initial = False if self._state.initial_location == location_state.name: self._state.initial_location = None if final_name != location_state.name: old_name = location_state.name location_state.name = final_name if self._state.initial_location == old_name: self._state.initial_location = final_name for edge_state in self._state.edges: if edge_state.source == old_name: edge_state.source = final_name if edge_state.target == old_name: edge_state.target = final_name return self
[docs] def remove_location(self, name: str) -> "TemplateBuilder": """ Remove one location by name. Any incident edges are removed together with the location. :param name: Location name to remove. :type name: str :return: The current template builder. :rtype: TemplateBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> template = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .location("Done") ... .remove_location("Done") ... ) >>> type(template).__name__ 'TemplateBuilder' """ self._ensure_open() normalized_name = _require_text("location name", name) for index, state in enumerate(self._state.locations): if state.name == normalized_name: removed_location = self._state.locations.pop(index) break else: raise ValueError("template {0!r} has no location {1!r}".format(self._state.name, normalized_name)) self._state.edges = [ edge for edge in self._state.edges if edge.source != normalized_name and edge.target != normalized_name ] if removed_location.initial or self._state.initial_location == normalized_name: self._state.initial_location = None return self
[docs] def edge( self, source: str, target: str, *, when: str = "", guard: str = "", sync: str = "", send: str = "", recv: str = "", update: str = "", reset: object = () ) -> "TemplateBuilder": """ Append one edge between two named locations. ``when`` is an alias for ``guard``. ``send`` and ``recv`` are sugar for ``sync="name!"`` and ``sync="name?"``. ``reset`` is sugar for a comma-joined assignment list such as ``{"x": 0}``. :param source: Source location name. :type source: str :param target: Target location name. :type target: str :param when: Optional guard alias. :type when: str :param guard: Optional guard text. :type guard: str :param sync: Optional synchronisation text such as ``"tick?"``. :type sync: str :param send: Optional send-side synchronisation sugar. :type send: str :param recv: Optional receive-side synchronisation sugar. :type recv: str :param update: Optional assignment text. :type update: str :param reset: Optional mapping or pair-sequence rendered into an assignment list. :type reset: object :return: The current template builder. :rtype: TemplateBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> template = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .location("Done") ... .edge("Init", "Done", send="tick", reset={"x": 0}) ... ) >>> type(template).__name__ 'TemplateBuilder' """ self._ensure_open() self._state.edges.append( _EdgeState( source=_require_text("source", source), target=_require_text("target", target), guard=_normalize_guard_text(guard, when), sync=_normalize_sync_text(sync, send, recv), update=_normalize_update_text(update, reset), ) ) return self
[docs] def update_edge( self, index: object = _UNSET, *, where: object = _UNSET, source: object = _UNSET, target: object = _UNSET, when: object = _UNSET, guard: object = _UNSET, sync: object = _UNSET, send: object = _UNSET, recv: object = _UNSET, update: object = _UNSET, reset: object = _UNSET, ) -> "TemplateBuilder": """ Update one edge by ``index`` or ``where`` selector. Negative indexes follow normal Python indexing rules when ``index`` is used. :param index: Optional edge index. :type index: object :param where: Optional edge selector mapping. :type where: object :param source: Optional replacement source location name. :type source: object :param target: Optional replacement target location name. :type target: object :param when: Optional replacement guard alias. :type when: object :param guard: Optional replacement guard text. :type guard: object :param sync: Optional replacement synchronisation text. :type sync: object :param send: Optional replacement send-side sugar. :type send: object :param recv: Optional replacement receive-side sugar. :type recv: object :param update: Optional replacement assignment text. :type update: object :param reset: Optional replacement reset sugar. :type reset: object :return: The current template builder. :rtype: TemplateBuilder .. note:: For index-based updates, obtain edge indexes from :meth:`ModelBuilder.list_edges` (for the owning template) or from the template snapshot inside :meth:`ModelBuilder.inspect`. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> template = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .location("Done") ... .edge("Init", "Done") ... .update_edge(where={"source": "Init", "target": "Done"}, when="x >= 1") ... ) >>> type(template).__name__ 'TemplateBuilder' """ self._ensure_open() selector = _UNSET if where is not _UNSET: selector = _normalize_selector( "edge", where, { "source": _require_text, "target": _require_text, "guard": _normalize_optional_text, "sync": _normalize_optional_text, "update": _normalize_optional_text, }, ) resolved = _resolve_selector_index( "edge", index=index, where=selector, size=len(self._state.edges), matcher=lambda current_index: _edge_match(self._state.edges[current_index], selector), formatter=lambda current_index: ( "index={0} source={1!r} target={2!r} guard={3!r} sync={4!r} update={5!r}".format( current_index, self._state.edges[current_index].source, self._state.edges[current_index].target, self._state.edges[current_index].guard, self._state.edges[current_index].sync, self._state.edges[current_index].update, ) ), ) edge_state = self._state.edges[resolved] final_guard = edge_state.guard final_sync = edge_state.sync final_update = edge_state.update if guard is not _UNSET or when is not _UNSET: final_guard = _normalize_guard_text( edge_state.guard if guard is _UNSET else guard, "" if when is _UNSET else when, ) if sync is not _UNSET or send is not _UNSET or recv is not _UNSET: final_sync = _normalize_sync_text( edge_state.sync if sync is _UNSET else sync, "" if send is _UNSET else send, "" if recv is _UNSET else recv, ) if update is not _UNSET or reset is not _UNSET: final_update = _normalize_update_text( edge_state.update if update is _UNSET else update, () if reset is _UNSET else reset, ) edge_state.source = edge_state.source if source is _UNSET else _require_text("source", source) edge_state.target = edge_state.target if target is _UNSET else _require_text("target", target) edge_state.guard = final_guard edge_state.sync = final_sync edge_state.update = final_update return self
[docs] def remove_edge(self, index: object = _UNSET, *, where: object = _UNSET) -> "TemplateBuilder": """ Remove one edge by ``index`` or ``where`` selector. Negative indexes follow normal Python indexing rules when ``index`` is used. :param index: Optional edge index. :type index: object :param where: Optional edge selector mapping. :type where: object :return: The current template builder. :rtype: TemplateBuilder .. note:: For index-based deletion, use :meth:`ModelBuilder.list_edges` or :meth:`ModelBuilder.inspect` to fetch current edge indexes before patching. Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> template = ( ... ModelBuilder() ... .template("P") ... .location("Init", initial=True) ... .location("Done") ... .edge("Init", "Done") ... .remove_edge(where={"source": "Init", "target": "Done"}) ... ) >>> type(template).__name__ 'TemplateBuilder' """ self._ensure_open() selector = _UNSET if where is not _UNSET: selector = _normalize_selector( "edge", where, { "source": _require_text, "target": _require_text, "guard": _normalize_optional_text, "sync": _normalize_optional_text, "update": _normalize_optional_text, }, ) resolved = _resolve_selector_index( "edge", index=index, where=selector, size=len(self._state.edges), matcher=lambda current_index: _edge_match(self._state.edges[current_index], selector), formatter=lambda current_index: ( "index={0} source={1!r} target={2!r} guard={3!r} sync={4!r} update={5!r}".format( current_index, self._state.edges[current_index].source, self._state.edges[current_index].target, self._state.edges[current_index].guard, self._state.edges[current_index].sync, self._state.edges[current_index].update, ) ), ) del self._state.edges[resolved] return self
[docs] def end(self) -> ModelBuilder: """ Finish the current template and return the parent builder. :return: The owning model builder. :rtype: ModelBuilder Example:: >>> from pyudbm.binding.utap_builder import ModelBuilder >>> builder = ModelBuilder().template("P").location("Init", initial=True).end() >>> isinstance(builder, ModelBuilder) True """ self._closed = True return self._model_builder
def _ensure_open(self) -> None: if self._closed: raise RuntimeError("template builder is already closed")