Skip to content

EventHub

Stakeholders

External Recipient: Backend team (internal)

Product Owner: [internal Name/Team]

Technical Owner: Data Engineering

Purpose

Data products consumed by the Backend team, delivered as Pub/Sub events and file exports derived from dbt models. EventHub covers both real-time delta-based events and scheduled file deliveries.


File Deliveries

CIP Verified Users

Daily export of users eligible for CIP verification, delivered as a CSV to Azure Blob Storage.

What it delivers: A list of user_ids for users who have user_state = 'Active' and kyc_cip_match_status = '3 out of 3'.

dbt model rpt_backend_cip_eligibility
DAG backend_cip_verified_users_azure
Schedule 50 7 * * * (07:50 UTC daily)
Destination Azure Blob Storage — account: prodpspfunding, container: pspfunding-cip-verifications
Filename cip_verified_users_{YYYYMMDD}.csv
Repos dt-airflow-dags (DAG), dt-reporting (build & upload)

Pipeline:

  1. DAG runs dbt build on rpt_backend_cip_eligibility (BigQuery, federated target)
  2. dt-reporting exports the table as a full CSV report
  3. File is uploaded to Azure Blob Storage

Events

Naming conventions and contents of the events have been defined as per the requirements set by the consumers (Backend team).

DAG internal_events_pubsub
Schedule Triggered by dbt-structure-daily DAG completion Airflow DAG
Weekend Delivery [Yes/No]
dbt schema reverse_etl
dbt folder models/reverse_etl
Repos dt-airflow-dags (DAG), dt-bq2pubsub (publish)

Data

Each event has three dbt models following the pattern: internal_<event_name>, internal_<event_name>_delta, and internal_<event_name>_previous_run.

PspRiskScoreUpdatedEvent

One row per user whose PSP risk score changed.

Event name Data.PspRiskScoreUpdatedEvent
dbt models internal_psp_risk_score_updated_event, _delta, _previous_run
Payload See below
{
  "advisorId": null,
  "eventCreated": "2025-10-20T14:59:30.836894+00:00",
  "eventData": {
    "risk_groups": "Medium Risk",
    "risk_score": 0.6,
    "total_points": 8
  },
  "eventId": "033f7f66-2616-4296-a547-2fe7b45d7f5e",
  "eventName": "Data.PspRiskScoreUpdatedEvent",
  "externalReferenceId": null,
  "publishedTime": "2025-10-20T14:59:30.836894+00:00",
  "userId": "3b40a2d3-936e-46a4-bfbd-288ff8d4e826"
}

CrdeUserModelUpdatedEvent

One row per user whose credit/risk decision model changed.

Event name Data.CrdeUserModelUpdatedEvent
dbt models internal_crde_user_model_updated_event, _delta, _previous_run
Payload See below
{
  "advisorId": null,
  "eventCreated": "2025-10-20T14:57:41.962859+00:00",
  "eventData": {
    "aml_sar_status": false,
    "avg_money_in_180": 0,
    "num_disputed_transactions_180": 0,
    "service_score": 8,
    "sum_disputed_amount_180": 0,
    "total_money_loss_180": 0,
    "user_credit_segment": "New_To_Platform",
    "user_id": "b956d91d-34da-4ded-8065-aed4bbd472cf",
    "user_state": "Active"
  },
  "eventId": "03f8725d-1841-4e11-a46b-bc89c6a274d3",
  "eventName": "Data.CrdeUserModelUpdatedEvent",
  "externalReferenceId": null,
  "publishedTime": "2025-10-20T14:57:41.962859+00:00",
  "userId": "b956d91d-34da-4ded-8065-aed4bbd472cf"
}

CreditEligibilityDataUpdatedEvent

One row per user whose credit eligibility data changed.

Event name Data.CreditEligibilityDataUpdatedEvent
dbt models internal_credit_eligibility_data_updated_event, _delta, _previous_run
Payload See below
{
  "advisorId": null,
  "eventCreated": "2025-10-20T14:58:17.289648+00:00",
  "eventData": {
    "aml_sar_status": false,
    "total_money_loss_180": 0,
    "user_id": "2a57397e-6a45-48f5-ab2f-9e585725c095"
  },
  "eventId": "7ba99b71-f598-4522-be9d-50772f5bf7f4",
  "eventName": "Data.CreditEligibilityDataUpdatedEvent",
  "externalReferenceId": null,
  "publishedTime": "2025-10-20T14:58:17.289648+00:00",
  "userId": "2a57397e-6a45-48f5-ab2f-9e585725c095"
}

UserRiskLevelUpdatedEvent

One row per user whose risk level changed.

Event name Data.UserRiskLevelUpdatedEvent
dbt models internal_user_risk_level_updated_event, _delta, _previous_run
Payload See below
{
  "advisorId": null,
  "eventCreated": "2025-10-20T14:58:17.289648+00:00",
  "eventData": {
    "customer_risk_rating": "Medium",
    "diaspora_risk_level": "High",
    "user_id": "2a57397e-6a45-48f5-ab2f-9e585725c095"
  },
  "eventId": "7ba99b71-f598-4522-be9d-50772f5bf7f4",
  "eventName": "Data.UserRiskLevelUpdatedEvent",
  "externalReferenceId": null,
  "publishedTime": "2025-10-20T14:58:17.289648+00:00",
  "userId": "2a57397e-6a45-48f5-ab2f-9e585725c095"
}

CheckRiskLevelUpdatedEvent

One row per user whose check risk level changed.

Event name Data.CheckRiskLevelUpdatedEvent
dbt models internal_check_risk_level_updated_event, _delta, _previous_run
Payload See below
{
  "advisorId": null,
  "eventCreated": "2025-10-20T14:58:17.289648+00:00",
  "eventData": {
    "risk_level": "High",
    "user_id": "2a57397e-6a45-48f5-ab2f-9e585725c095"
  },
  "eventId": "7ba99b71-f598-4522-be9d-50772f5bf7f4",
  "eventName": "Data.CheckRiskLevelUpdatedEvent",
  "externalReferenceId": null,
  "publishedTime": "2025-10-20T14:58:17.289648+00:00",
  "userId": "2a57397e-6a45-48f5-ab2f-9e585725c095"
}

Publishing Flow

graph TB
    Credit[Credit Event<br/>Publisher DAG]

    Credit --> Base[(update<br/>Base Table)]

    Base --> D1[(payload Delta<br/>event 1)]
    Base --> D2[(payload Delta<br/>event 2)]

    D1 --> Get[Get payload<br/>data<br/>from BQ<br/>per event]
    D2 --> Get

    Get --> Create[Create Event<br/>Messages]
    Create --> Validate[Validate<br/>All Messages]

    Validate -->Publish[Publish<br/>to Pub/Sub]

    Publish --> Update[Update<br/>Archive]

    Update --> Success[Complete]
Hold "Alt" / "Option" to enable pan & zoom
  1. Run Daily Models — Daily models run in the morning (Swedish timezone) via the dbt-structure-daily DAG in Airflow.
  2. Trigger Event Publisher — Once daily models complete, they trigger the internal_events_pubsub DAG.
  3. Calculate Base & Delta Tables — The DAG runs dbt models tagged with EventHub: calculates the base tables (internal_<event_name>), then creates delta tables (internal_<event_name>_delta) comparing current fields vs. last run. Delta contains only fields meant for event payloads.
  4. Publish Events — One event is published to Pub/Sub for each row in the delta table.
  5. Update Archive — The DAG updates the previous run table (internal_<event_name>_previous_run) holding the current state of the base table, serving as the benchmark for the next comparison run.