When building a data warehouse
(DWH), most of the time one of the main data sources will be one or more external transactional databases. Setting up incremental staging for hundreds of operational tables can be a cumbersome task which everyone wants to avoid.
Generally, we want to transfer the external databases into staging in their exact form with minimal transformations, since they are usually done in the later stages of a DWH development. The process then sounds simple enough, just transfer an external database to staging, right? Sadly, the SSIS
is not happy about runtime metadata changes and without explicit mappings for each table it won’t work, at least not out-of-the-box.
In this article we'll investigate the automating of this process which leaves more time for other areas that can't be as easily automated.
The primary goal is to avoid doing the same work twice, and that can be achieved by using existing database metadata, mainly database schema and unique indexes (clustered or non-clustered). The solution makes use of dynamic SQL and CozyRoc's SSIS+ Components Suite to provide support for dynamic SSIS metadata, which is a big issue for the vanilla SSIS toolbox. Dynamic SQL
provides support on database level, creating staging tables and preparing statements, while SSIS+ provides support for dynamic runtime metadata in SSIS.
The support for dynamic metadata could also be achieved by using SSIS Script task and having a C# developer at hand, but generally it is cheaper to buy a license for something that’s already been developed.
Extracting Database Schema and Creating Staging
To provide a proof of concept, we needed to create a solution that can transfer the databases from approximately thousand Informix servers to a single SQL Server instance that will be used as staging. Databases generally have the same schema but each contains its own operational data.
The first step is always analyzing the source data and planning. During analysis, it became apparent that tables don't follow the same structure and we'd need different ways of handling data load.
Tables could be described in one or more of the following ways: incremental/non-incremental key, versioned table, composite key and “large“ / “small“ table. To support this, we decided to implement multiple data load variations and enable switching of the table from one process to another by updating codebooks, therefore allowing easier process manipulation without having to change and re-deploy SSIS packages each time a change is needed. For now, we have three different processes:
- Full table load – fetches the whole table from the source and overwrites existing data
- Incremental load – makes use of incremental keys and only fetches new data each time; doesn't check if existing rows were updated or deleted
- Lookup load – used for tables that have a composite primary key; source and staging data are sorted by key columns and compared; larger tables should have a RowVersion column since otherwise this solution might not be viable for them
We need a few tables to store the required metadata, specifically:
- Servers table – contains a list of servers included in the process
- ServerName – describes name of the server, unique
- Username, Password, ConnectionString
- depending on IT infrastructure, you'll need these to connect to the server
- in our case, DSN were set up beforehand via ODBC Data Sources tool, so we only needed ConnectionString referring to the specific server's DSN
- IncludeInLoad – general Boolean flag to allow for exclusion of specific servers
- Tables table – contains a list of tables (per server) included in the process
- ServerName, SchemaName, TableName – uniquely describes a source table
- ColumnCount, RowCount, Version – for future reference, provided by Informix internal tables
- IncludeInFullLoad, IncludeInLookupLoad, IncludeInIncrementalLoad – flags used to determine which process should be used to load a specific table's data
- Columns table – lists table columns per server
- ServerName, SchemaName, TableName, ColumnName - uniquely describes a source column
- IBM_ColNo, ColType, DataType, ColLength, ColMin, ColMax, Nullable – Informix metadata that enables the creation of columns of the same type and size in SQL Server variant
- Incremental tables – subset table of Tables; used to track incremental key name and last loaded value
- TableLoadStatus – used to track load status of each table (InProgress, Completed, Failed) and operation row count
- SSISErrorLog – logs any errors SSIS tasks throw, using OnEvent handler
Keep in mind that this is just a short overview of the tables which describes the setup in general; some internal columns were omitted for the sake of simplicity.
Beside the tables, we need a few stored procedures that will dynamically create staging tables:
- meta.csp_FetchServersForLoad – generates a list of servers that are included in the load
- meta.csp_CreateMissingObjects (@ServerName) – creates schemas and tables based on server metadata; if a table was already created for another server, it will be skipped since the procedure assumes that tables are identical
- meta.csp_CreateMissingColumns (@ServerName) – creates new or missing columns for given server name; e.g. if a column was added on source table or table definition differs between servers
Staging tables that are created have a similar definition as source tables with a few exceptions. Large texts or binary columns are skipped as we don't expect they would contain DWH-relevant data and any exception to this rule would need to be analyzed and handled manually.
Besides source table definition, we add meta columns that help with data tracking. Most important columns are ServerName, which tells us from which server data was pulled from, and RowStatus, which tells us if a certain row was Inserted, Updated or Deleted. In case of Update or Delete, staging table will contain multiple versions of that row for each status change.
In our case, in a later iteration of the project we concluded that rows aren’t deleted from tables and there’s no reason to track deleted rows. This enabled us to use Informix’s cdrtime column, which is a hidden column that provides Unix timestamp of when the row was last modified (insert or update). Dynamic queries for incremental and lookup load were modified to also consider cdrtime to greatly reduce amount of data being fetched from source servers.
The primary key of each staging table is (ServerName,StagingID) which allows for parallel table inserts and reads while avoiding table locks. Such composite key will introduce the problem of clustered index fragmentation, but during testing no performance issues were detected. Changing index fillfactor might help if performance issues arise, but this is not covered by this article.
After we have manually populated server list, we can execute SSIS package that loads the metadata and executes required procedures to create staging tables.
Figure 1: Fetching server metadata (source: SSIS
Package flow is simple – it prepares a list of servers and then repeats the process for each server. It uses a lookup task to determine new tables and columns. We don't track deleted columns on source as we generally want to keep them in staging; old rows will retain the values and new rows will be populated with NULLs.
Once we're happy with how the package works, we disable “Server iterator“ and enable “Parallel server iterator“, SSIS+ task, which takes the Server iterator as input and creates multiple packages that can execute in parallel.
Loading Data into Staging Tables
Now that we have dynamically prepared staging tables, we need to populate them with data which will also be done dynamically, depending on key columns and table size.
In each data load package, we add two parallel tasks, one for the server list and one for the table list, due to variable database and table sizes. This allows the process to use parallelism on table level even after all but one server have finished loading the data.
During initial testing, running all three processes in parallel, 4 core 2.4 GHz server with 32GB of RAM generated about 50GB of data in about an hour. Data was loaded from 4 different servers and 24 processes were running in parallel. In the current development stage this is more than enough, but in production we'll likely need to scale out a bit.
Full Table Load
Full table load is intended for tables that don't have any valid incremental or lookup key, or that are small enough to be loaded in full each time.
Figure 2: Full load - control flow (source: SSIS
We prepare a delete statement that clears out the table data for specific server before new data is loaded. After fetching the data from source, we add a derived column ServerName
that tells us where the data originates from. We cannot simply truncate the table because parallel task load would mutually negate each other. After the load completes, we log “success” or “fail” as a new table load status. In the case of success, we additionally log inserted row count to help with debugging any potential issues.
Figure 3: Full load - data flow (source: SSIS
Incremental Table Load
Incremental load can be applied for tables that make use of simple incremental key, primary or surrogate. The key needs to be incremental and unique to ensure reliable data transfer.
While this process provides fastest load, it won't recognize any updated or deleted rows, allowing us to use this process only for tables that naturally don't allow updates or deletes, e.g. tables that track status changes over time or financial transactions.
Figure 4: Incremental load - control flow (source: SSIS
We update the latest key value before inserting new data to allow for switching from one process to another. For example, we can initially load a large table using full load and then switch it to incremental load. It’s the same with full load; we add ServerName
column and log RowCount.
Figure 5: Incremental load - data flow (source: SSIS)
Lookup Table Load
Lookup load supports tracking of any data changes: insert, update and delete operations. It does so by comparing two data sets, source (new) and staging (old) data, and sending data to relevant data outputs.
Select statements from both New and Old are prepared based on metadata. Additionally, sort columns (unique index columns) are passed to Sort New task. While the select statement for New dataset is currently a simple “select all” statement, it can be updated to also consider RowVersion or soft delete columns (if they exist).
The select statement for Old dataset is a bit more complicated. Since we keep track of all data changes in staging, we need to ensure that the select statement returns only the latest version of the table. We do that by applying Row Number window function over key columns and sorting it by date in descending order. Selecting only rows with Row Number equal to 1 and additionally excluding any deleted rows (after calculating row number) will return us a “snapshot“ of the latest data held in staging.
Without that, we'd have 1:N comparisons in case of any deletes or updates leading to exponential data growth; in other words, simply misleading records. E.g. if a row was previously updated, both versions of the row would now be compared to source, leading to duplication. On the next run, all four rows would be compared, and so on.
Table difference task picks up sort column(s) from the “Sort New“ task and uses those as a data set key, while comparing differences between other columns:
- all rows found in New but not in Old are considered Inserts
- all rows found in Old but not New are sent to Deleted output
- all rows found in both datasets are compared and any differences sent to Updated output
During testing, the lookup load proved to be a CPU intensive operation taking about twice as long to complete as incremental load, so it is best to avoid putting a table through this process whenever possible.
Figure 6: Lookup load - control flow (source: SSIS
Figure 7: Lookup load - data flow (source: SSIS
Hopefully this article helped you consider an alternative for staging external relational data. There’s still a lot of manual work before us, such as deciding on dimensions and fact tables, designing the warehouse, developing the processes to support them and in the end implementing the DWH into a reporting platform (eg. Power BI). However, at least we saved a significant amount of time by creating a process that can automate
a large part of the grunt work, which definitely makes it beneficial and worthwhile.