PSI Data Pipeline
System of scheduled jobs that move PSI manufacturing data from source systems (AFTEC, network shares, vendor APIs) into Azure SQL (
PSI_Analytics), with SQLite cache layers and a live operational dashboard. Powers PSI Explorer analytics views, Power BI, and downstream consumers.
Overview
The PSI Data Pipeline is a Python-based ETL system running on PS-PROXY. Eight independent scheduled jobs read from AFTEC CSV exports, the UniData API, vendor APIs (Rockwell, GE, Emerson, Dell), and network-share spreadsheets, then land data in three storage tiers ending at Azure SQL.
| Property | Value |
|---|---|
| Repository | ProgressiveSurface/psi-data-pipeline |
| Runner | PS-PROXY (self-hosted GHE Actions, [self-hosted, ps-proxy]) |
| Orchestration | Prefect server + work pool (psi-pipeline) running locally on PS-PROXY |
| Target database | Azure SQL PSI_Analytics on procserv-proddata.database.windows.net |
| Live dashboard | explorer.progressivesurface.com/#pipeline |
| Stack | Python 3.12, SQLite, pyodbc, Prefect, MSAL service-principal auth |
Architecture
Three storage tiers; data flows ingest → analytics → Azure SQL:
| Tier | Path | Role | Lifecycle |
|---|---|---|---|
psi_ingest.db | \\ad.ptihome.com\DFS\Schedule\SS123\LEADTIME\psi_ingest.db | Durable cache, scraper-owned | Persists across runs; preserves enrichment via INSERT OR REPLACE |
psi_analytics.db | \\ad.ptihome.com\DFS\Schedule\SS123\LEADTIME\psi_analytics.db | Read-optimized analytics SQLite | Rebuilt nightly from AFTEC CSVs + imports from psi_ingest.db. Atomic deploy via .tmp rename. |
PSI_Analytics | Azure SQL on procserv-proddata.database.windows.net | Production source of truth | Synced incrementally from analytics tier; consumed by PSI Explorer, Power BI |
Architecture doc in repo: docs/pipeline-architecture.md — authoritative for flow patterns, env vars, “how to add a new job”, and troubleshooting.
Data flow patterns
The pipeline has three distinct patterns:
Standard scraper:
Source → load_*.py → psi_ingest.db → publish_ingest_tables.py
→ psi_analytics.db → sync_table_to_azure.py
→ Azure SQL → verify_passthrough_tables.py
Analytics build:
AFTEC CSVs + psi_ingest.db imports → build_analytics_db.py
→ psi_analytics.db
→ (internal incremental Azure SQL sync, manifest-based)
Raw AFTEC:
AFTEC CSV files → sync_aftec_raw.py → Azure SQL (no SQLite)
Orchestration
Three layers cooperate, and the executor is mid-migration from GitHub Actions runners to the Prefect work pool. Both run on PS-PROXY today.
| Layer | Role |
|---|---|
Prefect work pool (psi-pipeline) | Target executor. A Prefect worker on PS-PROXY picks up scheduled deployments from prefect_deployments.py and runs the flow directly. Every load_*.py / sync_*.py is decorated with @flow. |
| GitHub Actions | Legacy executor. Each workflow has its own cron: block and runs the same Python. Today this is still the primary scheduler. Goal: GH Actions stops scheduling so the runner is free for CI on push. |
PSI Explorer dashboard (#pipeline) | Operator-facing UI. Reads the Prefect API and renders jobs, recent runs, row counts. |
Cron lives in two places today. Both .github/workflows/*.yml and prefect_deployments.py carry crons. As each job graduates to Prefect-only execution, remove the cron: block from the GHE workflow (keep workflow_dispatch: for manual triggers).
Active pipeline jobs
| Workflow | Prefect deployment | Schedule (UTC) | Schedule (ET) | Duration | Tables owned |
|---|---|---|---|---|---|
sync-aftec-raw.yml | sync-aftec-raw | 0 6 * * * | Daily 1 AM | ~2 hours | 31 raw AFTEC tables (tslabor2, openwo, partmaster, ecn, redbook, etc.) |
nightly-data-build.yml (Build Analytics DB) | (none — see Gaps) | 30 6 * * * | Daily 1:30 AM | ~30 min | projects, labor_detail, redbooks, machine_dna, workforce_features, comprehensive_view, + all imported ingest tables |
sync-kg-data.yml (Sync KG Data) | (none — see Gaps) | 0 7 * * * | Daily 2 AM | ~1 min | kg_* knowledge-graph tables (runs pipeline.cpq.ingest_to_graph) |
sync-bom-data.yml | sync-bom-data | 0 8 * * 1-5 | Weekdays 3 AM | ~1h | bom_parts (subset for active projects, via UniData API batch endpoint) |
sync-prims.yml | sync-prims | 0 5 * * 1-5 | Weekdays midnight | ~2 min | prims_installations |
sync-dept-schedules.yml | sync-dept-schedules | 0 14 * * 1-5 | Weekdays 9 AM | ~2 min | dept_schedules, floor_space |
sync-rockwell.yml | (none — see Gaps) | 0 10 * * 1 | Mondays 5 AM | ~5 min | component_lifecycle, project_components (Allen-Bradley + GE/Emerson) |
sync-product-taxonomy.yml | sync-product-taxonomy | 0 7 * * 1 | Mondays 2 AM | ~2 min | product_classes, product_catalog |
enrich-dell-warranty.yml | enrich-dell-warranty | 0 12 * * 6 | Saturday 7 AM | ~5 min | (updates prims_installations dell_* columns) |
setup-env-vars.yml | — | manual | — | — | One-time: writes Azure secrets as machine env vars |
Monitoring gaps
nightly-data-build,sync-rockwell, andsync-kg-dataare not Prefect deployments yet — they do not appear on the Explorer dashboard. Add toprefect_deployments.pyand re-run that script.nightly-data-buildrunsverify_passthrough_tables.pyoutside the Prefect flow as a separate workflow step. The Prefect-tracked flow can succeed while verify fails — dashboard shows “Healthy” while GHE Actions is red. Move verify inside a@flowso it surfaces.
Data sources
| Source | Records | What |
|---|---|---|
| AFTEC CSV exports | 9.25M+ rows / 31 tables | Daily exports from UniData/AFTEC (labor, work orders, BOM, parts, PO, redbook, ECN, etc.) |
| UniData API batch endpoint | 5.76M BOM rows | Live BOM exports for active projects |
Redbook SQLite (redbook_coq.db) | 40K RFCs | AI-classified quality issues |
| PRIMS Installations XLS | 650 PCs | PSI machine PC fleet inventory |
| Department schedule .xlsm files | 7 files | Weekly capacity by department + floor space |
| Rockwell / GE / Emerson APIs | 488 components | Allen-Bradley + GE + Emerson product lifecycle status |
| Dell CLI | per-PC warranty | Service tag → warranty end, model, ship date |
| Product taxonomy CSVs | 437 classes + 20,654 products | Product class hierarchy and catalog |
| LDS Excel files | ~3,000 projects | Per-project Gantt schedules (extracted by extract_lds_gantt.py) |
| OneNote Lessons Learned | 872 projects | Post-project lessons (extracted from .one binary) |
Handoff .docx notes | ~2,000 projects | Risk notes, comp-machine references (extract_handoff_notes) |
Live dashboard
The PSI Explorer hosts a Data Pipeline dashboard at #pipeline that reads the Prefect API and renders:
- 6 currently-deployed jobs as cards with status badges (Healthy / Partial / Failed)
- Per-job row counts and table counts
- Recent runs (50) with state, started, duration
- Summary counts (completed / failed / running)
- Manual “Run” button per job
- Link to the Prefect UI on PS-PROXY
Backend: analytics-routes.js in the psi-explorer-web repo queries the local Prefect API.
Repository layout
psi-data-pipeline/
├── CLAUDE.md # Cold-start orientation for AI sessions
├── README.md # Quick start + doc index
├── BUILD_LOG.md # Dated change log (append on every change)
├── prefect_deployments.py # Register flows with the psi-pipeline work pool
├── docs/
│ ├── pipeline-architecture.md # Authoritative architecture, workflow table, "how to add a job"
│ ├── TODO.md # Active backlog
│ ├── data-flow-diagram.md # Visual data flow
│ └── PSI_Engineering_Knowledge_Graph_v2.md
├── pipeline/
│ ├── build_analytics_db.py # Main analytics builder (AFTEC CSV → psi_analytics.db → Azure SQL)
│ ├── build_comprehensive_dataset.py # AFTEC CSV loaders (imported by build_analytics_db.py)
│ ├── sync_aftec_raw.py # Raw AFTEC CSV → Azure SQL (no SQLite)
│ ├── sync_table_to_azure.py # Generic SQLite → Azure SQL sync (manifest-based incremental)
│ ├── sync_bom_data.py # UniData API batch BOM export → Azure SQL
│ ├── publish_ingest_tables.py # psi_ingest.db → psi_analytics.db copy
│ ├── verify_passthrough_tables.py # Row-count check across all three DBs
│ ├── load_prims_standalone.py # PRIMS PC fleet loader
│ ├── load_dept_schedules.py # Department schedule + floor space loader
│ ├── load_product_taxonomy.py # Product classes + catalog loader
│ ├── rockwell_lifecycle.py # Rockwell + GE + Emerson lifecycle scraper
│ ├── enrich_dell_support.py # Dell warranty CLI enrichment
│ ├── azure_sql.py # Azure SQL connection + bulk insert helper
│ └── cpq/ # Knowledge graph extraction (CPQ subsystem)
└── .github/workflows/ # 9 workflow files (see table above)
Environment
PS-PROXY (Windows Server):
- Python 3.12 via
actions/setup-python@v5 - ODBC Driver 17/18 for SQL Server
- Network access to
\\ad.ptihome.com\DFS\*shares (always use this path, never\\fs1\or\\PS-GR-FS01\) - Drive mappings:
S:=\\ad.ptihome.com\DFS\DATA,I:= Quotes - Prefect server local at
127.0.0.1:4200(no Prefect Cloud) - Azure credentials from GHE Actions secrets (
AZURE_TENANT_ID,AZURE_CLIENT_ID,AZURE_CLIENT_SECRET)
Common operations
Manually trigger a workflow
$env:GH_HOST = 'progressivesurface.ghe.com'
gh workflow run "Sync PRIMS" --repo ProgressiveSurface/psi-data-pipelineForce a re-sync (bypass manifest-based skip)
The incremental sync compares row_count:col_count signatures. To force a full re-sync:
rm "\\ad.ptihome.com\DFS\Schedule\SS123\LEADTIME\.azure_sync_manifest.json"Add a new pipeline job
See docs/pipeline-architecture.md → “How to Add a New Pipeline Job” for the full walkthrough (5 steps: create loader, register in publish list, register in analytics build, create workflow, test).
After registering, also add to prefect_deployments.py and re-run it to register the deployment with the work pool.
Check pipeline status
- Operator view: explorer.progressivesurface.com/#pipeline
- CLI:
gh run list --workflow=<workflow-name>(setGH_HOST=progressivesurface.ghe.com) - Prefect UI: see “Prefect UI” link on the dashboard
Known issues / class of bugs
The Azure SQL passthrough track has recurring data-fidelity bugs because each load_*.py defines its own table DDL and column widths:
- String truncation — source values exceed guessed
NVARCHAR(N)widths. Fix: widen affected column or useNVARCHAR(MAX)for unbounded text. Add column names to thelong_colsset inbuild_analytics_db.pyandsync_table_to_azure.py. - Unique-index conflicts — source data has duplicate keys that contradict the table’s declared uniqueness. Fix: deduplicate at load time, or drop the unique constraint if the source legitimately allows duplicates (see
product_catalog). - Phantom rows in Azure SQL — tables synced without a primary key accumulate stale rows when source records are deleted. Fix: implement a delete-missing pass in
sync_table_to_azure.pyfor keyless tables.
Long-term direction: centralize Azure SQL DDL in one location instead of per-script.
Related
- PSI Explorer — consumes the Azure SQL output; hosts the operational dashboard
- UniData API — source for live BOM extraction
- AFTEC Data Brain — schema and source-of-truth reference for AFTEC tables
- Fabric Progressive Dataset — Power BI dataset built from
PSI_Analytics - Repoint Power BI to Azure SQL — guide for migrating Power BI reports onto this pipeline