Infoworks 6.1.3
Prepare Data

Using Pipeline Targets

Using Target

Data Transformation materializes the data as a flattened Hive table after performing all the operations like joins, aggregations, and unions on the big data sources defined by the pipeline design. The target node is used to specify the details of the table such as its schema name, table name and target file system path.

NOTES

  • All target nodes except Snowflake Target is disabled for pipelines in snowflake environment.
  • All target nodes except BigQuery Target is disabled for pipelines in the bigquery environment.
  • If the imported pipeline contains a node which is not supported in data warehouse pipeline, that node is shown to be in the error state to the user every time the editor page of that pipeline is opened. This error is visible in Warnings and Errors model of the pipeline editor page as well.
  • Only Target Node is supported for pipelines in Databricks SQL execution type.

Following are the steps to apply Target node in pipeline:

  1. Double-click the Target node. The Properties page is displayed.

NOTE Any target column, apart from system generated audit columns, cannot start with "ziw".

  1. Click Edit Properties. The following page is displayed.
  2. Edit the relevant properties, and click Save.
  3. Sync Type determines if existing data (if any) will be overwritten or will be updated.

You can also select Append to append the data of one pipeline target to already built pipeline target. This can be achieved by giving a reference table in pipeline target.

On selecting the Sync Type to append/merge/insert overwrite, a dropdown for Reference Table appears. All built pipeline tables are part of the reference table option. On reference table selection, properties such as schema name, table name, target file system path etc, are visible. The appendable/mergeable/insert overwritable target will have these properties of reference table by default. These properties are non-editable from the target node.

There is also a separate section for column mappings of reference table and target node. In mapping section, all columns from reference table are listed on the left-hand side while all columns from target node are listed on the right-hand side.

You can sort the order of the columns (using the edit feature) from the target node if the order of target columns does not match with the order of columns of reference table. Number of target columns needs to be greater than or equal to number of reference table columns. All unmapped columns from target table are excluded automatically on saving. All audit columns must be mapped to respective audit columns. For all other columns, data type of mapped columns must match.

If the column name in target is different than the corresponding mapped column of reference table, user gets a notification for renaming the column and on clicking OK , column name in the target node is renamed. On saving the mapping section, schema of the target table is changed appropriately to match the schema of reference table.

SCD Type includes two options, SCD 1 and SCD 2. These are the two models supported for merge sync type.

SCD 1 (Slowly Changing Dimension type-1) : SCD 1 maintains only the most updated records (no history) in the table while SCD 2 maintains history also. For mergeable target you can select reference table of type SCD 1 or SCD 2 depending on the requirement.

SCD 2 (Slowly Changing Dimension type-2): To maintain history, different levels of granularity are supported like, Year, Month, Day, Hour, Minute and Second.

For example, for a record if you configure granularity as Day, only the most recent update per day is maintained. For any updates that happen on the next day, only the latest update for the day will be maintained. To maintain all history, set granularity level as Second. By default, the granularity level used is Second.

Following are the three audit columns used in SCD 2:

ZIW_ACTIVE: The value is true for all active records and false for history records.

ZIW_TARGET_START_TIMESTAMP: Maintains start timestamp of a record.For example, for Day level granularity, the value is the first second of the day, and for Hour level granularity, the first second of the Hour.

ZIW_TARGET_END_TIMESTAMP: Maintains end timestamp of a record. For active records the value is an INFINITY timestamp whereas, for non-active records the value is the last second of the day/hour/month etc.

  • Granularity includes options like Year, Month, Day, Hour, Minute and Second.
  • Configuring Granularity: You can also set the granularity level using pipeline configuration with the key dt_scd2_granularity.
  • Non SCD 2 Columns: All columns will be considered SCD 2 columns by default. Select the columns to be excluded from SCD 2.

NOTE These settings must be performed in the reference pipeline and not in the merge target pipeline.

  • Storage Format : Select the required storage format. The options include Delta, Parquet, CSV, JSON, Avro, and ORC.

NOTE You cannot change the storage format of an already built target to delta format. To change the delta storage format, you must rename the target table name and target location. This is because, delta maintains the metadata along with the data at the target location and it will fail if it does not find the metadata at the target location.

  • Natural Keys is a set of columns which uniquely identifies a record. The user can use a derive with iwuuid() as the derive expression if none such exist.

EXAMPLE Let us look at an example to understand how the history of records is maintained when there are updates to SCD2 and non-SCD2 attributes.

Following is a record with three attributes: ID, First_Name, and Last_Name as of 9/7/2020. First_Name is configured as a SCD2 attribute, while Last_Name is configured as non-SCD2 attribute. The granularity is set to second (by default).

IDFirst_NameLast_NameZIW_ActiveZIW_STZIW_ETActive/Inactive
1Fname1LName1TRUE9/7/20209/9/9999Active Record

Now, let's say that on 9/10/2020, the First_Name value for this record was updated to "Fname2". Since First_Name attribute is marked as SCD2, we will maintain the history and the updated target records would now appear as given below:

IDFirst_NameLast_NameZIW_ActiveZIW_STZIW_ETActive/Inactive
1Fname1LName1FALSE9/7/20209/10/2020Inactive Record
1Fname2LName1TRUE9/10/20209/9/9999Active Record

Now, let's say that on 9/15/2020, the Last_Name value for this record was updated to "Lname2". Since Last_Name attribute is marked as non-SCD2, the history is not maintained and the same Active record is updated directly:

IDFirst_NameLast_NameZIW_ActiveZIW_STZIW_ETActive/Inactive
1Fname1LName1FALSE9/7/20209/10/2020Inactive Record
1Fname2LName2TRUE9/15/20209/9/9999Active Record

LIMITATION iwuuid() cannot be chosen as a natural key when the load is incrementally set to False and Sync type is merge/ insert overwrite.

  • Number of Buckets and Bucketing Columns : Custom Bucketing configuration is used for changing bucketing logic. By default, logic for bucketing is based on natural key columns selected. But if you are aware of the join columns used that are not same as natural keys, then you can change bucketing columns. This improves the performance. If this section is left blank, internally, Infoworks clusters data in table based on natural key columns selected. Enter the required values appropriately.
  • Primary Partition Columns: This is an optional field. A column which has a low cardinality whose values do not get updated is a good candidate for primary partitions. It can help speed up a query which uses this column as a filter or joining condition.
  • Index Columns is a set of columns that enable improved query processing time. Indexing uses bloom filters in ORC files to index any column type. When a target is created in ORC format, the system passes columns in index columns to create index in ORC file.

WARNING Some data type degrades performance of queries on the indexed columns.

  • Sort By Columns are the columns by which the columns are sorted in each cluster.
  • Delete Records Column is a column that represents a value true for records that are considered as deleted. The column selected in the Delete Records Column drop-down list will be automatically mapped to ZIW_IS_DELETED audit column.

NOTES

  • The ZIW_IS_DELETED audit column will be available in the target output only if a column is selected in the Delete Records Column drop-down list.
  • If two or more existing artifacts share the same target file system path, the build will fail unless you change the target configuration.
  • During Pipeline build, existing target tables are available for external tools.
  • In case of using any pipeline targets in APPEND mode, the set detect_duplicates_ in_data_fetch to true advanced configuration must be set at source level before running CDC.
  • If SCD2 target build fails in the spark environment with DateTimeParseException (For instance, The following error occurs: Root cause error : java.time.format.DateTimeParseException). In this case, set “spark.sql.legacy.timeParserPolicy=LEGACY” as an advanced configuration at pipeline level.
  • If SCD2 pipeline merge job fails with “org.apache.spark.sql.AnalysisException: Undefined function: 'iwtime_similarity'.” error on persistent cluster in the Spark-Databricks environment, then re-run the job using ephemeral cluster.
  • Incase of update pipeline, when pipeline uses InNotIn or Exist node previous to the target node, it is recommended to use natural keys and the configuration dt_update_using_natural_keys and set as true in advanced configuration.

Limitations

  • Changing the values of Primary Partition Columns and Bucketing Columns for records is not supported. Any change in primary partition or bucketing or natural key will result in insertion of new record in Merge mode for SCD 1/SCD 2.
  • Changing the values of Primary Partition Columns and Bucketing Columns for records is not supported. Any change in primary partition or bucketing will result in insertion of new record and skip of any update for a row which update was there in any column except watermark column in insert overwrite mode for sync mode.
  • For insert overwrite sync mode, Natural Keys should be exactly same as Natural Keys configured during onboarding data (ingestion).
  • Spark does not provide APIs to handle Compression Formats. It only supports SNAPPY.
  • Null data type is not supported if the target file is parquet.
  • SCD 2 target does not support complex datatypes like map, array union, etc.

Spark Hive Compatibility

Spark bucketing semantics are different from Hive bucketing semantics. This behaviour can be controlled by the Hive Compatible Target check box.

Infoworks Data Transformation handles these differences using the following methods:

Hive Compatible Targets

  • No bucketing will be applied. Bucket columns will be used as secondary partition columns.
  • ZIW_SEC_P column will be automatically created.
  • ZIW_SEC_P value is calculated as hashcode(bucketcolumns)%noofbuckets

NOTE Hive compatible target has a limitation in handling partition values with space in Merge Mode.

Non-Hive Compatible Targets

Spark bucketing will be applied. These targets will not be readable by Hive.

Spark SQL and Hive SQL expressions are compatible for majority of cases. However there are some differences. Some of the known issues are captured here:

  • Hive supports MONTH on decimal type and date type, however, spark supports only on date type.
  • Hive variance function aliases to var_pop while spark aliases to var_samp.
  • Hive union vs spark union has different behaviour.

For example select col1 from tab1 union all select col1 from tab2. If tab1.col1 is decimal(7,2)and tab2.col1 is decimal(38,0),

  • Hive target col is decimal(38,0) (rounds off any decimal value to nearest integer).
  • Spark target col is decimal(38,2) (no round-off).
  • iw_rowid function returns different rowid for spark vs Hive when decimal values are present in natural keys.
  • Spark CAST as timestamp expects epochSeconds while Hive cast expects milliseconds.

See the Spark SQL documentation for any issues.

Additive Schema Handling

If only new columns have been added to a source table or pipeline, additive schema handling alters the table to add these columns at the end of the table. Additive schema handling is supported only in the Merge mode.

The additive schema changes are displayed in build log summary of pipeline.

Limitation

  • The rows for which the new column values are not defined will be assigned as null.
  • Data type of additive columns can only be primitive.
  • Additive columns cannot be part of target properties like natural keys, primary partition columns, etc.
  • Build time of additive schema handling in merge mode is comparatively higher.
  • Additive schema handling will be executed only when no Reference Table is set for Merge Mode in the Target Properties window.
  • Additive schema is currently not supported for MapR-DB targets.

Pipeline Merge

An optimized merge algorithm has been introduced to overcome the performance issue of pipeline merge. The issue occurred due to partition by partition merge of the target table in pipeline and performance was degraded with more number of partitions in the target table.

By default, the optimized pipeline merge is used but to switch back to the previous pipeline merge, you can set the following configuration in the pipeline advance configurations: dt_use_partition_level_merge=true.

BigQuery Labels

This label enables you to add labels to your tables in BigQuery. For more information about BigQuery labels, refer to BigQuery Labels.

Case Sensitive Target Tables

In previous releases, external/export pipeline targets did not support case-sensitive column names. However, starting from version 6.1.3, support has been introduced to preserve the case of column names during export and in external pipelines.

This functionality can be enabled/disabled using following configurations:

  1. In the pipeline's advanced configuration, set the parameter dt_is_table_case_sensitive to true.

    1. By default, this configuration is set to false for all targets, except Oracle and Teradata. It has no effect on Cosmos.
    2. When set to true, the case sensitivity for column names is enforced by enclosing the column names in double quotes.
    3. By default, when set to false, columns are saved according to the system's default case.
  2. To retain the column case in Cosmos, set the pipeline advanced configuration dt_cosmos_columns_lowercase to false.

    1. Cosmos is case-sensitive by default, as collections are accessed using the exact case of the column.
    2. dt_cosmos_columns_lowercase is set to false by default to preserve the previous behavior.
    3. It is not recommended to add this configuration in incremental pipelines after the first build, as it will create new columns with the case retained in their names.
  3. To enable case-sensitive schema alignment, set the pipeline advanced configuration dt_case_sensitive_schema_alignment to true.

    1. By default, the configuration value is set to false.
    2. This configuration applies only in incremental modes.
    3. When set to true, it resolves column case mismatches between the target and the delta table by aligning the column case to match the target.

NOTES

  • The above configuration has no effect on native targets. It can be set in both domain and pipeline advanced configurations.
  • Points 2 and 3 are newly added configurations.
  Last updated by Monika Momaya