Add `--skip-audits` flag to plan and run commands by airhorns · Pull Request #5730 · SQLMesh/sqlmesh

Expand Up @@ -200,6 +200,7 @@ def evaluate( allow_destructive_snapshots: t.Optional[t.Set[str]] = None, allow_additive_snapshots: t.Optional[t.Set[str]] = None, target_table_exists: t.Optional[bool] = None, skip_audits: bool = False, **kwargs: t.Any, ) -> t.List[AuditResult]: """Evaluate a snapshot and add the processed interval to the state sync. Expand Down Expand Up @@ -239,17 +240,19 @@ def evaluate( target_table_exists=target_table_exists, **kwargs, ) audit_results = self._audit_snapshot( snapshot=snapshot, environment_naming_info=environment_naming_info, start=start, end=end, execution_time=execution_time, snapshots=snapshots, deployability_index=deployability_index, wap_id=wap_id, **kwargs, ) audit_results: t.List[AuditResult] = [] if not skip_audits: audit_results = self._audit_snapshot( snapshot=snapshot, environment_naming_info=environment_naming_info, start=start, end=end, execution_time=execution_time, snapshots=snapshots, deployability_index=deployability_index, wap_id=wap_id, **kwargs, )
self.state_sync.add_interval( snapshot, start, end, is_dev=not is_deployable, last_altered_ts=now_timestamp() Expand All @@ -272,6 +275,7 @@ def run( deployability_index: t.Optional[DeployabilityIndex] = None, auto_restatement_enabled: bool = False, run_environment_statements: bool = False, skip_audits: bool = False, ) -> CompletionStatus: return self._run_or_audit( environment=environment, Expand All @@ -288,6 +292,7 @@ def run( deployability_index=deployability_index, auto_restatement_enabled=auto_restatement_enabled, run_environment_statements=run_environment_statements, skip_audits=skip_audits, )
def audit( Expand All @@ -304,7 +309,11 @@ def audit( circuit_breaker: t.Optional[t.Callable[[], bool]] = None, deployability_index: t.Optional[DeployabilityIndex] = None, run_environment_statements: bool = False, skip_audits: bool = False, ) -> CompletionStatus: if skip_audits: return CompletionStatus.SUCCESS
# Remove the intervals from the snapshots that will be audited so that they can be recomputed # by _run_or_audit as "missing intervals" to reuse the rest of it's logic remove_intervals = {} Expand Down Expand Up @@ -434,6 +443,7 @@ def run_merged_intervals( selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, run_environment_statements: bool = False, audit_only: bool = False, skip_audits: bool = False, auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}, is_restatement: bool = False, ) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]: Expand Down Expand Up @@ -537,15 +547,16 @@ def run_node(node: SchedulingUnit) -> None: assert deployability_index # mypy
if audit_only: audit_results = self._audit_snapshot( snapshot=snapshot, environment_naming_info=environment_naming_info, deployability_index=deployability_index, snapshots=self.snapshots_by_name, start=start, end=end, execution_time=execution_time, ) if not skip_audits: audit_results = self._audit_snapshot( snapshot=snapshot, environment_naming_info=environment_naming_info, deployability_index=deployability_index, snapshots=self.snapshots_by_name, start=start, end=end, execution_time=execution_time, ) else: # If batch_index > 0, then the target table must exist since the first batch would have created it target_table_exists = ( Expand All @@ -563,6 +574,7 @@ def run_node(node: SchedulingUnit) -> None: allow_additive_snapshots=allow_additive_snapshots, target_table_exists=target_table_exists, selected_models=selected_models, skip_audits=skip_audits, )
evaluation_duration_ms = now_timestamp() - execution_start_ts Expand Down Expand Up @@ -785,6 +797,7 @@ def _run_or_audit( auto_restatement_enabled: bool = False, run_environment_statements: bool = False, audit_only: bool = False, skip_audits: bool = False, ) -> CompletionStatus: """Concurrently runs or audits all snapshots in topological order.
Expand Down Expand Up @@ -876,6 +889,7 @@ def _run_or_audit( end=end, run_environment_statements=run_environment_statements, audit_only=audit_only, skip_audits=skip_audits, auto_restatement_triggers=auto_restatement_triggers, selected_models={ s.node.dbt_unique_id for s in merged_intervals if s.node.dbt_unique_id Expand Down