PSI Data Pipeline

System of scheduled jobs that move PSI manufacturing data from source systems (AFTEC, network shares, vendor APIs) into Azure SQL (PSI_Analytics), with SQLite cache layers and a live operational dashboard. Powers PSI Explorer analytics views, Power BI, and downstream consumers.


Overview

The PSI Data Pipeline is a Python-based ETL system running on PS-PROXY. Eight independent scheduled jobs read from AFTEC CSV exports, the UniData API, vendor APIs (Rockwell, GE, Emerson, Dell), and network-share spreadsheets, then land data in three storage tiers ending at Azure SQL.

PropertyValue
RepositoryProgressiveSurface/psi-data-pipeline
RunnerPS-PROXY (self-hosted GHE Actions, [self-hosted, ps-proxy])
OrchestrationPrefect server + work pool (psi-pipeline) running locally on PS-PROXY
Target databaseAzure SQL PSI_Analytics on procserv-proddata.database.windows.net
Live dashboardexplorer.progressivesurface.com/#pipeline
StackPython 3.12, SQLite, pyodbc, Prefect, MSAL service-principal auth

Architecture

Three storage tiers; data flows ingest → analytics → Azure SQL:

TierPathRoleLifecycle
psi_ingest.db\\ad.ptihome.com\DFS\Schedule\SS123\LEADTIME\psi_ingest.dbDurable cache, scraper-ownedPersists across runs; preserves enrichment via INSERT OR REPLACE
psi_analytics.db\\ad.ptihome.com\DFS\Schedule\SS123\LEADTIME\psi_analytics.dbRead-optimized analytics SQLiteRebuilt nightly from AFTEC CSVs + imports from psi_ingest.db. Atomic deploy via .tmp rename.
PSI_AnalyticsAzure SQL on procserv-proddata.database.windows.netProduction source of truthSynced incrementally from analytics tier; consumed by PSI Explorer, Power BI

Architecture doc in repo: docs/pipeline-architecture.md — authoritative for flow patterns, env vars, “how to add a new job”, and troubleshooting.

Data flow patterns

The pipeline has three distinct patterns:

Standard scraper:
    Source → load_*.py → psi_ingest.db → publish_ingest_tables.py
                       → psi_analytics.db → sync_table_to_azure.py
                       → Azure SQL → verify_passthrough_tables.py

Analytics build:
    AFTEC CSVs + psi_ingest.db imports → build_analytics_db.py
                       → psi_analytics.db
                       → (internal incremental Azure SQL sync, manifest-based)

Raw AFTEC:
    AFTEC CSV files → sync_aftec_raw.py → Azure SQL (no SQLite)

Orchestration

Three layers cooperate, and the executor is mid-migration from GitHub Actions runners to the Prefect work pool. Both run on PS-PROXY today.

LayerRole
Prefect work pool (psi-pipeline)Target executor. A Prefect worker on PS-PROXY picks up scheduled deployments from prefect_deployments.py and runs the flow directly. Every load_*.py / sync_*.py is decorated with @flow.
GitHub ActionsLegacy executor. Each workflow has its own cron: block and runs the same Python. Today this is still the primary scheduler. Goal: GH Actions stops scheduling so the runner is free for CI on push.
PSI Explorer dashboard (#pipeline)Operator-facing UI. Reads the Prefect API and renders jobs, recent runs, row counts.

Cron lives in two places today. Both .github/workflows/*.yml and prefect_deployments.py carry crons. As each job graduates to Prefect-only execution, remove the cron: block from the GHE workflow (keep workflow_dispatch: for manual triggers).


Active pipeline jobs

WorkflowPrefect deploymentSchedule (UTC)Schedule (ET)DurationTables owned
sync-aftec-raw.ymlsync-aftec-raw0 6 * * *Daily 1 AM~2 hours31 raw AFTEC tables (tslabor2, openwo, partmaster, ecn, redbook, etc.)
nightly-data-build.yml (Build Analytics DB)(none — see Gaps)30 6 * * *Daily 1:30 AM~30 minprojects, labor_detail, redbooks, machine_dna, workforce_features, comprehensive_view, + all imported ingest tables
sync-kg-data.yml (Sync KG Data)(none — see Gaps)0 7 * * *Daily 2 AM~1 minkg_* knowledge-graph tables (runs pipeline.cpq.ingest_to_graph)
sync-bom-data.ymlsync-bom-data0 8 * * 1-5Weekdays 3 AM~1hbom_parts (subset for active projects, via UniData API batch endpoint)
sync-prims.ymlsync-prims0 5 * * 1-5Weekdays midnight~2 minprims_installations
sync-dept-schedules.ymlsync-dept-schedules0 14 * * 1-5Weekdays 9 AM~2 mindept_schedules, floor_space
sync-rockwell.yml(none — see Gaps)0 10 * * 1Mondays 5 AM~5 mincomponent_lifecycle, project_components (Allen-Bradley + GE/Emerson)
sync-product-taxonomy.ymlsync-product-taxonomy0 7 * * 1Mondays 2 AM~2 minproduct_classes, product_catalog
enrich-dell-warranty.ymlenrich-dell-warranty0 12 * * 6Saturday 7 AM~5 min(updates prims_installations dell_* columns)
setup-env-vars.ymlmanualOne-time: writes Azure secrets as machine env vars

Monitoring gaps

  • nightly-data-build, sync-rockwell, and sync-kg-data are not Prefect deployments yet — they do not appear on the Explorer dashboard. Add to prefect_deployments.py and re-run that script.
  • nightly-data-build runs verify_passthrough_tables.py outside the Prefect flow as a separate workflow step. The Prefect-tracked flow can succeed while verify fails — dashboard shows “Healthy” while GHE Actions is red. Move verify inside a @flow so it surfaces.

Data sources

SourceRecordsWhat
AFTEC CSV exports9.25M+ rows / 31 tablesDaily exports from UniData/AFTEC (labor, work orders, BOM, parts, PO, redbook, ECN, etc.)
UniData API batch endpoint5.76M BOM rowsLive BOM exports for active projects
Redbook SQLite (redbook_coq.db)40K RFCsAI-classified quality issues
PRIMS Installations XLS650 PCsPSI machine PC fleet inventory
Department schedule .xlsm files7 filesWeekly capacity by department + floor space
Rockwell / GE / Emerson APIs488 componentsAllen-Bradley + GE + Emerson product lifecycle status
Dell CLIper-PC warrantyService tag → warranty end, model, ship date
Product taxonomy CSVs437 classes + 20,654 productsProduct class hierarchy and catalog
LDS Excel files~3,000 projectsPer-project Gantt schedules (extracted by extract_lds_gantt.py)
OneNote Lessons Learned872 projectsPost-project lessons (extracted from .one binary)
Handoff .docx notes~2,000 projectsRisk notes, comp-machine references (extract_handoff_notes)

Live dashboard

The PSI Explorer hosts a Data Pipeline dashboard at #pipeline that reads the Prefect API and renders:

  • 6 currently-deployed jobs as cards with status badges (Healthy / Partial / Failed)
  • Per-job row counts and table counts
  • Recent runs (50) with state, started, duration
  • Summary counts (completed / failed / running)
  • Manual “Run” button per job
  • Link to the Prefect UI on PS-PROXY

Backend: analytics-routes.js in the psi-explorer-web repo queries the local Prefect API.


Repository layout

psi-data-pipeline/
├── CLAUDE.md                          # Cold-start orientation for AI sessions
├── README.md                          # Quick start + doc index
├── BUILD_LOG.md                       # Dated change log (append on every change)
├── prefect_deployments.py             # Register flows with the psi-pipeline work pool
├── docs/
│   ├── pipeline-architecture.md       # Authoritative architecture, workflow table, "how to add a job"
│   ├── TODO.md                        # Active backlog
│   ├── data-flow-diagram.md           # Visual data flow
│   └── PSI_Engineering_Knowledge_Graph_v2.md
├── pipeline/
│   ├── build_analytics_db.py          # Main analytics builder (AFTEC CSV → psi_analytics.db → Azure SQL)
│   ├── build_comprehensive_dataset.py # AFTEC CSV loaders (imported by build_analytics_db.py)
│   ├── sync_aftec_raw.py              # Raw AFTEC CSV → Azure SQL (no SQLite)
│   ├── sync_table_to_azure.py         # Generic SQLite → Azure SQL sync (manifest-based incremental)
│   ├── sync_bom_data.py               # UniData API batch BOM export → Azure SQL
│   ├── publish_ingest_tables.py       # psi_ingest.db → psi_analytics.db copy
│   ├── verify_passthrough_tables.py   # Row-count check across all three DBs
│   ├── load_prims_standalone.py       # PRIMS PC fleet loader
│   ├── load_dept_schedules.py         # Department schedule + floor space loader
│   ├── load_product_taxonomy.py       # Product classes + catalog loader
│   ├── rockwell_lifecycle.py          # Rockwell + GE + Emerson lifecycle scraper
│   ├── enrich_dell_support.py         # Dell warranty CLI enrichment
│   ├── azure_sql.py                   # Azure SQL connection + bulk insert helper
│   └── cpq/                           # Knowledge graph extraction (CPQ subsystem)
└── .github/workflows/                 # 9 workflow files (see table above)

Environment

PS-PROXY (Windows Server):

  • Python 3.12 via actions/setup-python@v5
  • ODBC Driver 17/18 for SQL Server
  • Network access to \\ad.ptihome.com\DFS\* shares (always use this path, never \\fs1\ or \\PS-GR-FS01\)
  • Drive mappings: S: = \\ad.ptihome.com\DFS\DATA, I: = Quotes
  • Prefect server local at 127.0.0.1:4200 (no Prefect Cloud)
  • Azure credentials from GHE Actions secrets (AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET)

Common operations

Manually trigger a workflow

$env:GH_HOST = 'progressivesurface.ghe.com'
gh workflow run "Sync PRIMS" --repo ProgressiveSurface/psi-data-pipeline

Force a re-sync (bypass manifest-based skip)

The incremental sync compares row_count:col_count signatures. To force a full re-sync:

rm "\\ad.ptihome.com\DFS\Schedule\SS123\LEADTIME\.azure_sync_manifest.json"

Add a new pipeline job

See docs/pipeline-architecture.md → “How to Add a New Pipeline Job” for the full walkthrough (5 steps: create loader, register in publish list, register in analytics build, create workflow, test).

After registering, also add to prefect_deployments.py and re-run it to register the deployment with the work pool.

Check pipeline status


Known issues / class of bugs

The Azure SQL passthrough track has recurring data-fidelity bugs because each load_*.py defines its own table DDL and column widths:

  • String truncation — source values exceed guessed NVARCHAR(N) widths. Fix: widen affected column or use NVARCHAR(MAX) for unbounded text. Add column names to the long_cols set in build_analytics_db.py and sync_table_to_azure.py.
  • Unique-index conflicts — source data has duplicate keys that contradict the table’s declared uniqueness. Fix: deduplicate at load time, or drop the unique constraint if the source legitimately allows duplicates (see product_catalog).
  • Phantom rows in Azure SQL — tables synced without a primary key accumulate stale rows when source records are deleted. Fix: implement a delete-missing pass in sync_table_to_azure.py for keyless tables.

Long-term direction: centralize Azure SQL DDL in one location instead of per-script.