Data Profiling

This is a data profiling application that can be applied to workbooks to examine the data. This can help design builder functions for applications. The idea is that when a data profiling run fails, there are three possible problems:

  • The data has changed and this is the first we’ve heard of the change. We need to extend the application to handle this.

  • Our provious understanding of the data was incomplete, and this data shows a previously unknown case. This suggests our application had a latent bug.

  • The data really is bad. The data must be rejected and reworked.

Finding out which of these three things requires concrete details from a data profiling application.

This produces simple RST-format output on stdout. A common use case is the following:

python demo/profile.py sample/\*.csv >profile_csv.rst
rst2html.py profile_csv.rst profile_csv.html

This gives us an HTML-formatted report showing distributions of values in each column.

This will follow a number of the design patterns shown earlier. See the demo/profile.py file in the Git repository for the complete source.

Core Processing

The core proessing is to gather counts of individual sample values.

class Stats:
    def __init__(self, sheet: Sheet) -> None:
        self.sheet = sheet
        self.stats = defaultdict(lambda: Counter())

    def sample(self, column: str, value: Any) -> None:
        self.stats[column][value] += 1

    def report(self) -> None:
        title = f"{self.sheet.workbook().name} :: {self.sheet.name}"
        print(title)
        print("=" * len(title))
        print()
        for name, attr in self.sheet.schema.properties.items():
            print(name)
            print("-" * len(name))
            print()
            print(attr)
            print()
            print("..  csv-table::")
            print()
            for k in self.stats[name]:
                print('    "{0}","{1}"'.format(k, self.stats[name][k]))
            print()

    def serialize(self) -> str:
        buffer = StringIO()
        with contextlib.redirect_stdout(buffer):
            self.report()
        return buffer.getvalue()

The serialization uses the print() function to create a file with the needed report. This lets us tinker with string formatting, and use the string.Template or even the Jinja template processing tool. It also permits using contextlib.redirect_stdout to capture the print output into a file.

Processing Context

A data profiling application doesn’t, generally, produce much stateful output. It doesn’t often update a database.

This is a “reporting” application. In effect, it’s an elaborate query. This means it has two important features:

  • The output is meant for human consumption.

  • The processing is fully idempotent. We can run the application as often as needed without worrying about corrupting a database with out-of-order operations or duplicate operations.

Because there isn’t a significant, persistent state change a persistence management class is – perhaps – unnecessary.

It helps, however, to follow the overall design pattern in case some persistent state change does become part of this application.

class Profile_Processing:
    """A subclass might avoid printing the results??"""

    def __init__(self, fail_fast: bool = False) -> None:
        self.stop_on_exception = fail_fast

    def save_stats(self, stats: Stats) -> None:
        print(stats.serialize())

    def __enter__(self) -> "Profile_Processing":
        return self

    def __exit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: TracebackType,
    ) -> None:
        pass

This design lets us write a subclass that provides an alternative definition of the save_stats() method. We can then use the alternative subclass to change the output processing.

Sheet Processing

The process_sheet() function the heart of the application. This handles all the rows present in a given sheet.

def process_sheet(sheet: Sheet, mode: Profile_Processing) -> Counter:
    audit_counts = Counter()
    statistics = Stats(sheet)
    for source_row in sheet.rows():
        try:
            audit_counts["read"] += 1
            for name in sheet.schema.properties:
                statistics.sample(name, source_row[name].value())
        except Exception as e:
            audit_counts["invalid"] += 1
            if mode.stop_on_exception:
                raise
            logger.error(f"{e}")  #: {source_row.dump()}")
            audit_counts[("error ", type(e))] += 1

    mode.save_stats(statistics)
    return audit_counts

Some applications will have variant processing for workbooks that contain different types of sheets. This leads to different process_this_sheet and process_that_sheet functions. Each will follow the above template to process all rows of the sheet.

Workbook Processing

def process_workbook(input: Workbook, mode: Profile_Processing) -> None:
    for sheet in input.sheet_iter():
        logger.info("{0} :: {1}".format(input.name, sheet.name))
        sheet.set_schema_loader(HeadingRowSchemaLoader())
        counts = process_sheet(sheet, mode)
        logger.info(pprint.pformat(dict(counts)))

Command-Line Interface

We have an optional argument for verbosity and a positional argument that provides all the files to profile. This function parses the command-line arguments:

def parse_args(argv: list[str]) -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument("file", type=Path, nargs="+")
    parser.add_argument("-f", "--fail-fast", default=False, action="store_true")
    parser.add_argument(
        "-v",
        "--verbose",
        dest="verbosity",
        default=logging.INFO,
        action="store_const",
        const=logging.DEBUG,
    )
    return parser.parse_args(argv)

Here’s the overall main() function:

def main(argv: list[str] = sys.argv[1:]) -> None:
    args = parse_args(argv)
    logging.getLogger().setLevel(args.verbosity)
    with Profile_Processing(args.fail_fast) as mode:
        try:
            for input in args.file:
                with open_workbook(input) as source:
                    process_workbook(source, mode)
        except Exception as e:
            logging.error(e)
            raise

Running the Demo

We can run this program like this:

python3 demo/profile.py sample/\*.csv >profile_csv.rst
rst2html.py profile_csv.rst profile_csv.html

The RST output file looks like this:

sample/Anscombe_quartet_data.csv
====================================

x123
----

AtomicSchema({'title': 'x123', '$anchor': 'x123', 'type': 'string', 'position': 0})

..  csv-table::

    "10.0","1"
    "8.0","1"
    "13.0","1"
    "9.0","1"
    "11.0","1"
    "14.0","1"
    "6.0","1"
    "4.0","1"
    "12.0","1"
    "7.0","1"
    "5.0","1"

y1
--

AtomicSchema({'title': 'y1', '$anchor': 'y1', 'type': 'string', 'position': 1})

..  csv-table::

    "8.04","1"
    "6.95","1"
    "7.58","1"
    "8.81","1"
    "8.33","1"
    "9.96","1"
    "7.24","1"
    "4.26","1"
    "10.84","1"
    "4.82","1"
    "5.68","1"

y2
--

AtomicSchema({'title': 'y2', '$anchor': 'y2', 'type': 'string', 'position': 2})

..  csv-table::

    "9.14","1"
    "8.14","1"
    "8.74","1"
    "8.77","1"
    "9.26","1"
    "8.10","1"
    "6.13","1"
    "3.10","1"
    "9.13","1"
    "7.26","1"
    "4.74","1"

y3
--

AtomicSchema({'title': 'y3', '$anchor': 'y3', 'type': 'string', 'position': 3})

..  csv-table::

    "7.46","1"
    "6.77","1"
    "12.74","1"
    "7.11","1"
    "7.81","1"
    "8.84","1"
    "6.08","1"
    "5.39","1"
    "8.15","1"
    "6.42","1"
    "5.73","1"

x4
--

AtomicSchema({'title': 'x4', '$anchor': 'x4', 'type': 'string', 'position': 4})

..  csv-table::

    "8.0","10"
    "19.0","1"

y4
--

AtomicSchema({'title': 'y4', '$anchor': 'y4', 'type': 'string', 'position': 5})

..  csv-table::

    "6.58","1"
    "5.76","1"
    "7.71","1"
    "8.84","1"
    "8.47","1"
    "7.04","1"
    "5.25","1"
    "12.50","1"
    "5.56","1"
    "7.91","1"
    "6.89","1"


sample/Anscombe_schema.csv
==============================

x123
----

AtomicSchema({'title': 'x123', '$anchor': 'x123', 'type': 'string', 'position': 0})

..  csv-table::

    "y1","1"
    "y2","1"
    "y3","1"
    "x4","1"
    "y4","1"

X values for series 1, 2, and 3.
--------------------------------

AtomicSchema({'title': 'X values for series 1, 2, and 3.', '$anchor': 'X_values_for_series_1_2_and_3.', 'type': 'string', 'position': 1})

..  csv-table::

    "Y value for series 1.","1"
    "Y value for series 2.","1"
    "Y value for series 3.","1"
    "X value for series 4.","1"
    "Y value for series 4.","1"

number
------

AtomicSchema({'title': 'number', '$anchor': 'number', 'type': 'string', 'position': 2})

..  csv-table::

    "number","5"


sample/csv_workbook.csv
===========================

Col 1 - int
-----------

AtomicSchema({'title': 'Col 1 - int', '$anchor': 'Col_1_-_int', 'type': 'string', 'position': 0})

..  csv-table::

    "42","1"
    "9973","1"

Col 2.0 - float
---------------

AtomicSchema({'title': 'Col 2.0 - float', '$anchor': 'Col_2.0_-_float', 'type': 'string', 'position': 1})

..  csv-table::

    "3.1415926","1"
    "2.7182818","1"

Column "3" - string
-------------------

AtomicSchema({'title': 'Column "3" - string', '$anchor': 'Column_3_-_string', 'type': 'string', 'position': 2})

..  csv-table::

    "string","1"
    "data","1"

Column '4' - date
-----------------

AtomicSchema({'title': "Column '4' - date", '$anchor': 'Column_4_-_date', 'type': 'string', 'position': 3})

..  csv-table::

    "09/10/56","1"
    "01/18/59","1"

Column 5 - boolean
------------------

AtomicSchema({'title': 'Column 5 - boolean', '$anchor': 'Column_5_-_boolean', 'type': 'string', 'position': 4})

..  csv-table::

    "TRUE","1"
    "FALSE","1"

Column 6 - empty
----------------

AtomicSchema({'title': 'Column 6 - empty', '$anchor': 'Column_6_-_empty', 'type': 'string', 'position': 5})

..  csv-table::

    "","2"

Column 7 - Error
----------------

AtomicSchema({'title': 'Column 7 - Error', '$anchor': 'Column_7_-_Error', 'type': 'string', 'position': 6})

..  csv-table::

    "#DIV/0!","1"
    "#NAME?","1"


sample/simple.csv
=====================

name
----

AtomicSchema({'title': 'name', '$anchor': 'name', 'type': 'string', 'position': 0})

..  csv-table::

    "Col 1 - int","1"
    "Col 2.0 – float","1"
    "Column “3” - string","1"
    "Column '4' – date","1"
    "Column 5 – boolean","1"
    "Column 6 – empty","1"
    "Column 7 – Error","1"

offset
------

AtomicSchema({'title': 'offset', '$anchor': 'offset', 'type': 'string', 'position': 1})

..  csv-table::

    "1","1"
    "12","1"
    "23","1"
    "34","1"
    "45","1"
    "56","1"
    "67","1"

size
----

AtomicSchema({'title': 'size', '$anchor': 'size', 'type': 'string', 'position': 2})

..  csv-table::

    "11","7"

type
----

AtomicSchema({'title': 'type', '$anchor': 'type', 'type': 'string', 'position': 3})

..  csv-table::

    "int","1"
    "float","1"
    "str","3"
    "datetime","1"
    "bool","1"

This can be processed by pandoc or docutils to create an HTML report.