EventHub
Stakeholders
External Recipient: Backend team (internal)
Product Owner: [internal Name/Team]
Technical Owner: Data Engineering
Purpose
Data products consumed by the Backend team, delivered as Pub/Sub events and file exports derived from dbt models. EventHub covers both real-time delta-based events and scheduled file deliveries.
File Deliveries
CIP Verified Users
Daily export of users eligible for CIP verification, delivered as a CSV to Azure Blob Storage.
What it delivers: A list of user_ids for users who have user_state = 'Active' and kyc_cip_match_status = '3 out of 3'.
| dbt model | rpt_backend_cip_eligibility |
| DAG | backend_cip_verified_users_azure |
| Schedule | 50 7 * * * (07:50 UTC daily) |
| Destination | Azure Blob Storage — account: prodpspfunding, container: pspfunding-cip-verifications |
| Filename | cip_verified_users_{YYYYMMDD}.csv |
| Repos | dt-airflow-dags (DAG), dt-reporting (build & upload) |
Pipeline:
- DAG runs
dbt buildonrpt_backend_cip_eligibility(BigQuery, federated target) dt-reportingexports the table as a full CSV report- File is uploaded to Azure Blob Storage
Events
Naming conventions and contents of the events have been defined as per the requirements set by the consumers (Backend team).
| DAG | internal_events_pubsub |
| Schedule | Triggered by dbt-structure-daily DAG completion Airflow DAG |
| Weekend Delivery | [Yes/No] |
| dbt schema | reverse_etl |
| dbt folder | models/reverse_etl |
| Repos | dt-airflow-dags (DAG), dt-bq2pubsub (publish) |
Data
Each event has three dbt models following the pattern: internal_<event_name>, internal_<event_name>_delta, and internal_<event_name>_previous_run.
PspRiskScoreUpdatedEvent
One row per user whose PSP risk score changed.
| Event name | Data.PspRiskScoreUpdatedEvent |
| dbt models | internal_psp_risk_score_updated_event, _delta, _previous_run |
| Payload | See below |
{
"advisorId": null,
"eventCreated": "2025-10-20T14:59:30.836894+00:00",
"eventData": {
"risk_groups": "Medium Risk",
"risk_score": 0.6,
"total_points": 8
},
"eventId": "033f7f66-2616-4296-a547-2fe7b45d7f5e",
"eventName": "Data.PspRiskScoreUpdatedEvent",
"externalReferenceId": null,
"publishedTime": "2025-10-20T14:59:30.836894+00:00",
"userId": "3b40a2d3-936e-46a4-bfbd-288ff8d4e826"
}
CrdeUserModelUpdatedEvent
One row per user whose credit/risk decision model changed.
| Event name | Data.CrdeUserModelUpdatedEvent |
| dbt models | internal_crde_user_model_updated_event, _delta, _previous_run |
| Payload | See below |
{
"advisorId": null,
"eventCreated": "2025-10-20T14:57:41.962859+00:00",
"eventData": {
"aml_sar_status": false,
"avg_money_in_180": 0,
"num_disputed_transactions_180": 0,
"service_score": 8,
"sum_disputed_amount_180": 0,
"total_money_loss_180": 0,
"user_credit_segment": "New_To_Platform",
"user_id": "b956d91d-34da-4ded-8065-aed4bbd472cf",
"user_state": "Active"
},
"eventId": "03f8725d-1841-4e11-a46b-bc89c6a274d3",
"eventName": "Data.CrdeUserModelUpdatedEvent",
"externalReferenceId": null,
"publishedTime": "2025-10-20T14:57:41.962859+00:00",
"userId": "b956d91d-34da-4ded-8065-aed4bbd472cf"
}
CreditEligibilityDataUpdatedEvent
One row per user whose credit eligibility data changed.
| Event name | Data.CreditEligibilityDataUpdatedEvent |
| dbt models | internal_credit_eligibility_data_updated_event, _delta, _previous_run |
| Payload | See below |
{
"advisorId": null,
"eventCreated": "2025-10-20T14:58:17.289648+00:00",
"eventData": {
"aml_sar_status": false,
"total_money_loss_180": 0,
"user_id": "2a57397e-6a45-48f5-ab2f-9e585725c095"
},
"eventId": "7ba99b71-f598-4522-be9d-50772f5bf7f4",
"eventName": "Data.CreditEligibilityDataUpdatedEvent",
"externalReferenceId": null,
"publishedTime": "2025-10-20T14:58:17.289648+00:00",
"userId": "2a57397e-6a45-48f5-ab2f-9e585725c095"
}
UserRiskLevelUpdatedEvent
One row per user whose risk level changed.
| Event name | Data.UserRiskLevelUpdatedEvent |
| dbt models | internal_user_risk_level_updated_event, _delta, _previous_run |
| Payload | See below |
{
"advisorId": null,
"eventCreated": "2025-10-20T14:58:17.289648+00:00",
"eventData": {
"customer_risk_rating": "Medium",
"diaspora_risk_level": "High",
"user_id": "2a57397e-6a45-48f5-ab2f-9e585725c095"
},
"eventId": "7ba99b71-f598-4522-be9d-50772f5bf7f4",
"eventName": "Data.UserRiskLevelUpdatedEvent",
"externalReferenceId": null,
"publishedTime": "2025-10-20T14:58:17.289648+00:00",
"userId": "2a57397e-6a45-48f5-ab2f-9e585725c095"
}
CheckRiskLevelUpdatedEvent
One row per user whose check risk level changed.
| Event name | Data.CheckRiskLevelUpdatedEvent |
| dbt models | internal_check_risk_level_updated_event, _delta, _previous_run |
| Payload | See below |
{
"advisorId": null,
"eventCreated": "2025-10-20T14:58:17.289648+00:00",
"eventData": {
"risk_level": "High",
"user_id": "2a57397e-6a45-48f5-ab2f-9e585725c095"
},
"eventId": "7ba99b71-f598-4522-be9d-50772f5bf7f4",
"eventName": "Data.CheckRiskLevelUpdatedEvent",
"externalReferenceId": null,
"publishedTime": "2025-10-20T14:58:17.289648+00:00",
"userId": "2a57397e-6a45-48f5-ab2f-9e585725c095"
}
Publishing Flow
graph TB
Credit[Credit Event<br/>Publisher DAG]
Credit --> Base[(update<br/>Base Table)]
Base --> D1[(payload Delta<br/>event 1)]
Base --> D2[(payload Delta<br/>event 2)]
D1 --> Get[Get payload<br/>data<br/>from BQ<br/>per event]
D2 --> Get
Get --> Create[Create Event<br/>Messages]
Create --> Validate[Validate<br/>All Messages]
Validate -->Publish[Publish<br/>to Pub/Sub]
Publish --> Update[Update<br/>Archive]
Update --> Success[Complete]
- Run Daily Models — Daily models run in the morning (Swedish timezone) via the
dbt-structure-dailyDAG in Airflow. - Trigger Event Publisher — Once daily models complete, they trigger the
internal_events_pubsubDAG. - Calculate Base & Delta Tables — The DAG runs dbt models tagged with
EventHub: calculates the base tables (internal_<event_name>), then creates delta tables (internal_<event_name>_delta) comparing current fields vs. last run. Delta contains only fields meant for event payloads. - Publish Events — One event is published to Pub/Sub for each row in the delta table.
- Update Archive — The DAG updates the previous run table (
internal_<event_name>_previous_run) holding the current state of the base table, serving as the benchmark for the next comparison run.