Data Conversion – ETL with Validation

Data conversions – from one file format (or database) to another – involve an Extract from the source, a possible transformation, and an load into the target. ETL is an apt generalization of a great many data movement operations.

The Extract and Load ends of the process are clearly schema-driven and need to be independent of file format. The Transformation in the middle might be nothing more than changing representation from source schema type to target schema type. The Transformation may be more complex, and involve multiple source joins, or normalizations, or computation of derived fields. In general, the transformation portion is pure Python, and doesn’t need to be explored in any detail.

The Extract and Load steps, however, do benefit from having a schema. This follows the design patterns shown earlier. See the demo/conversion.py file in the Git repository for the complete source.

Builder

Here’s the builder function to create an intermediate document from the data source.

def builder(row: Row) -> dict[str, Any]:
    doc = {
        col: float(row.name(col).value())
        for col in ("x123", "y1", "y2", "y3", "x4", "y4")
    }
    return doc

This is used as the Extract part of ETL.

Persistence

Here’s context manager that handles persistence. This provides a save_json() method. A subclass can remove this to implement a “dry-run” data validation operating mode.

This is one version of the Load part of ETL.

class Persistent_Processing:
    stop_on_exception = True

    def __init__(self, target_path: Path) -> None:
        self.target_path = target_path
        self.target_file: TextIO

    def __enter__(self) -> "Persistent_Processing":
        self.target_file = self.target_path.open("w")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        self.target_file.close()
        return False

    def save_json(self, this_instance: JSON) -> None:
        self.target_file.write(json.dumps(this_instance) + "\n")

Validation

This can be viewed as part of Extract or part of Transform. It seems to make sense to think of data validation as the first stage of any Transformation processing.

class Validation_Processing(Persistent_Processing):
    stop_on_exception = False

    def __enter__(self) -> "Validation_Processing":
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        return False

    def save_json(self, this_instance: JSON) -> None:
        print(json.dumps(this_instance))

Target Schema for Output

This schema is used to define the Load operation. It – in conjunction with the Persistent_Processing class – implements the “load” part. In this case, the load writes a file.

TARGET_SCHEMA = {
    "title": "Anscombe's Quartet",
    "type": "object",
    "properties": {
        "x123": {"type": "number"},
        "y1": {"type": "number"},
        "y2": {"type": "number"},
        "y3": {"type": "number"},
        "x4": {"type": "number"},
        "y4": {"type": "number"},
    },
}

Process Sheet

The source has one (or more) sheets of data. This will extract, transform, and load all of them.

def process_sheet(sheet: Sheet, persistence: Persistent_Processing) -> Counter:
    counts = Counter()
    for row in sheet.rows():
        counts["input"] += 1
        # Convert to an intermediate form
        doc = builder(row)
        # Vaidate against the target JSONSchema
        if Draft202012Validator(TARGET_SCHEMA).is_valid(doc):
            # Persist the valid data.
            persistence.save_json(doc)
            counts["output"] += 1
        else:
            # Report on the invalid data
            counts["invalid"] += 1
            print(f"error, {row} produced invalid {doc}")
            for error in Draft202012Validator(TARGET_SCHEMA).iter_errors(doc):
                print(" ", error)
    return counts

Main Program

Argument parsing looks like this:

def parse_args(argv: list[str]) -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument("file", type=Path, nargs="+")
    parser.add_argument("-o", "--output", type=Path)
    parser.add_argument("-d", "--dry-run", default=False, action="store_true")
    parser.add_argument(
        "-v",
        "--verbose",
        dest="verbosity",
        default=logging.INFO,
        action="store_const",
        const=logging.DEBUG,
    )
    return parser.parse_args(argv)

The overall main() function:

def main(argv: list[str] = sys.argv[1:]) -> None:
    args = parse_args(argv)
    mode_class = Validation_Processing if args.dry_run else Persistent_Processing
    with mode_class(args.output) as persistence:
        for source in args.file:
            with open_workbook(source) as workbook:
                sheet = workbook.sheet("Sheet1")
                sheet.set_schema_loader(HeadingRowSchemaLoader())
                counts = process_sheet(sheet, persistence)
            pprint(counts)