Headline

Data Warehouse and reporting implementation for 101companies using Language:Python.

Characteristics

A Data Warehouse is a database used for reporting and data analysis. This Language:Python based contribution implements a so called ETL (Extract, Transform, Load), extracting data from multiple databases using the Database schema of Contribution:mySqlOneReporting, transforming its data (i.e. converting salaries in different currencies) and loads them into a new Star Database schema, tracking the history and removing unneeded data. Afterwards it generates charts based on the historic data.

Illustration

The contribution consists of two parts:

  • The ETL
  • The chart generator

ETL

Extract

The contribution works with one or more source databases using the database schema as described in Contribution:mySqlOneReporting. The schema implements the additional dimensions gender, age (yet using day of birth - dob) and job role.

ERM of source database schema

media:https://raw.github.com/ydupont/101repo/master/wiki/feature dimensionality/img/src er.png

NOTE: The enum 'gender' can have the values 'UNDEFINED','FEMALE' and 'MALE'

In regular intervals the source databases are queried by the ETL. The extracted data will be loaded in source models.

Excerpt of etl/models.py

class SourceEmployee(object):
    """
    Represents a row data for src N.employee table:
    id, name, address, salary, manager, did, jid, gender, dob
    """
    def   init  (self, *args):
        assert len(args) == 9
        self.id = args[0]
        self.name = args[1]
        self.address = args[2]
        self.salary = args[3]
        self.manager = args[4]
        self.did = args[5]
        self.jid = args[6]
        self.gender = args[7]
        self.dob = args[8]

Transform

Once all data has been extracted and loaded into models the transformation is taking place. Transformation is needed to unify different data structures and to implement new attributes based on gathered information. In this case salary currencies are being converted and the employees age is being calculated using the date of extraction as a reference point.

Excerpt of etl/transform.py

    def employees(self, source employees, cid):
        """
        Transforms source employees objects.
        """
        for employee in source employees:
            yield TargetEmployee(
                employee.id, # Reference use only, NOT INSERTED INTO DB
                employee.gender,
                employee.did,
                employee.jid,
                self.tid,
                cid,
                self. apply exchange rate(employee.salary),
                employee.manager,
                get age(employee.dob, self.today))

Exemplary the function to calculate the employees age.

Excerpt of utils/ init .py

def get age(dob, current date):
"""
Returns person's age as of current date given the date of birth.
"""
    try:
        bday = dob.replace(year=current date.year)
    except ValueError:
        # This happens if dob falls on 29th Feb and the current year is
        # not a leap year.
        bday = dob.replace(year=current date.year, day=dob.day - 1)
    if bday > current date:
        age = current date.year - dob.year - 1
    else:
        age = current date.year - dob.year
    return age

Load

After transformation of the source data it a specific target model will be used.

Excerpt of etl/models.py

class TargetEmployee(object):
    """
    Represents a row data for tgt N.fact employee table:
    id, gender, did, jid, tid, cid, salary, manager, age
    """
    def   init  (self, *args):
        assert len(args) == 9
        self.id = args[0]
        self.gender = args[1]
        self.did = args[2]
        self.jid = args[3]
        self.tid = args[4]
        self.cid = args[5]
        self.salary = args[6]
        self.manager = args[7]
        self.age = args[8]

And finally saved in a target database using a different data schema (star schema, only partially normalized).

ERM of target database schema

media:https://raw.github.com/ydupont/101repo/master/wiki/feature dimensionality/img/tgt er.png

NOTE: The enum 'interval' can have the values 'DAILY','MONTHLY' and 'YEARLY'

Chart generation

After running the ETL several times there is a sufficient dataset in the target database which references historic states of the source databases. Now the database can be used to create a chart with selected dimensions. The following class generates the needed SQL queries for chart generation based on the choices of the user.

Excerpt of chart/query.py

class Query(object):
    """
    Performs database queries to fetch all the required lines data in order to
    generate a chart.
    """
    def   init  (self, database, chart):
        self.database = database
        self.chart = chart

    def  split line range(self, line range):
        """
        Splits a line range into a tuple representing start and end of the
        range. This is typically used for age range.
        """
        line range = line range.split("-")
        if len(line range) > 1:
            (start, end) = line range
        else:
            start = line range[0]
            end = start
        return (start, end)

    def  get tables(self):
        """
        Returns a list of tables that forms the FROM clause for a query given
        the data type.
        """
        if self.chart['data type'] == JOBROLE:
            return "fact employee e, dim time d, dim jobrole j"
        return "fact employee e, dim time d"

    def  get time limit(self):
        """
        Returns a time limit string that forms the WHERE clause for a query
        given the x-axis inputs.
        """
        x unit = self.chart['x']['unit'].upper()  # x-axis for time
        x limit = self.chart['x']['limit']  # (start, end) or None
        time limit = "e.tid = d.id AND d.time interval = '{}'".format(x unit)
        if x limit is not None:
            time limit += " AND d.date >= '{}' AND d.date <= '{}'".format(
                x limit[0], x limit[1])
        return time limit

    def  get data limit(self, line range):
        """
        Returns a data limit string that forms the WHERE clause for a query
        given the data label.
        """
        data limit = ""
        data type = self.chart['data type']
        if data type == AGE:
            (start, end) = self. split line range(line range)
            data limit = "e.age >= {} AND e.age <= {}".format(start, end)
        elif data type == COMPANY:
            data limit = "e.cid = {}".format(int(line range))
        elif data type == GENDER:
            data limit = "e.gender = '{}'".format(line range.upper())
        elif data type == JOBROLE:
            data limit = "e.jid = j.id AND j.name = '{}'".format(line range)
        elif data type == MANAGER:
            data limit = "e.manager = {}".format(int(line range))
        return data limit

    def  get data select(self, context):
        """
        Returns a select statement that forms a joinable query given the
        unit for y-axis, i.e. median or total.
        """
        query = ""
        if self.chart['y']['unit'] == MEDIAN:
            query = (
                "(SELECT date AS line{idx} date, "
                "AVG(salary) AS line{idx} value "
                "FROM ( "
                "SELECT t1.row, t1.salary, t1.date FROM ( "
                "SELECT IF(@prev != date, @rownum := 1, @rownum := @rownum + 1) AS row, @prev := date AS date, salary "
                "FROM ( "
                "SELECT d.date, e.salary "
                "FROM {tables} "
                "WHERE {data limit} AND {time limit} "
                "ORDER BY date, salary "
                ") ordered, (SELECT @rownum := 0, @prev := NULL) reset "
                ") AS t1 INNER JOIN "
                "( "
                "SELECT COUNT(*) AS total rows, d.date AS date "
                "FROM {tables} "
                "WHERE {data limit} AND {time limit} "
                "GROUP BY date "
                ") AS t2 "
                "ON t1.date = t2.date "
                "AND t1.row >= t2.total rows / 2 AND t1.row <= ((t2.total rows / 2) + 1) "
                ") median "
                "GROUP BY date ORDER BY date) line{idx} ")
        elif self.chart['y']['unit'] == TOTAL:
            query = (
                "(SELECT d.date AS line{idx} date, "
                "SUM(e.salary) AS line{idx} value "
                "FROM {tables} "
                "WHERE {data limit} AND {time limit} "
                "GROUP BY date ORDER BY date) line{idx} ")
        return query.format(idx=context['idx'],
                            tables=context['tables'],
                            data limit=context['data limit'],
                            time limit=context['time limit'])

    def get data rows(self, line ranges):
        """
        Returns data rows for use in rendering the chart given the line ranges.
        """
        left column = "COALESCE("  # Header for first column
        right columns = ""  # Headers for subsequent columns
        joins = " "
        tables = self. get tables()
        time limit = self. get time limit()

        for idx, line range in enumerate(line ranges, 1):
            data limit = self. get data limit(line range)

            joins += self. get data select({
                'idx': idx,
                'tables': tables,
                'data limit': data limit,
                'time limit': time limit,
            })

            if idx > 1:
                joins += "ON line{} date = line{} date ".format(idx - 1, idx)
            if idx != len(line ranges):
                joins += "LEFT JOIN "

            left column += "line{} date".format(idx)
            if idx < len(line ranges):
                left column += ", "
            else:
                left column += ") AS date,"

            right columns += "line{} value".format(idx)
            if idx < len(line ranges):
                right columns += ", "

        query = "SELECT {} {} FROM".format(left column, right columns) + joins
        logging.debug("query={}".format(query))

        try:
            self.database.cursor.execute(query)
        except ProgrammingError as err:
            raise QueryError(err)
        rows = self.database.cursor.fetchall()

        logging.debug("rows={}".format(rows))
        return rows

Lastly a chart is being generated using the Google Charts API.

media:https://raw.github.com/ydupont/pyDWH/master/doc/sample chart.png

Usage

Requirements

Preparation

mysql> CREATE DATABASE src 1;
mysql> CREATE DATABASE src 2;
mysql> CREATE DATABASE tgt;
NOTE: Grant privileges in case you don´t want to use the root user. You can use "n" source databases.

Load source- and target database schema:

$ mysql -u <USERNAME> -p -h <HOSTNAME> src 1 < sql/schema/src.sql
$ mysql -u <USERNAME> -p -h <HOSTNAME> src 2 < sql/schema/src.sql
$ mysql -u <USERNAME> -p -h <HOSTNAME> tgt < sql/schema/tgt.sql

Populate the databases on your own or using the sample data:

$ mysql -u <USERNAME> -p -h <HOSTNAME> src 1 < sql/sample/src 1/src 1 1.sql
$ mysql -u <USERNAME> -p -h <HOSTNAME> src 2 < sql/sample/src 2/src 2 1.sql

Execute

Please check the configuration file (config.cfg) and adjust it according to your environment. You may add new "source" sections.

In case you want to use the ETL in production mode you will have to install a new cronjob with the following command:

$ python run etl.py -c config.cfg
(please ensure that you are in the proper directory)

In case you want to use the ETL in simulation mode you will have to change the according options in the configuration files and run the ETL as well as the database updates manually. You can do it with the packaged sample SQL files:

$ mysql -u <USERNAME> -p -h <HOSTNAME> src 1 < sql/sample/src 1/src 1 1.sql
$ mysql -u <USERNAME> -p -h <HOSTNAME> src 2 < sql/sample/src 2/src 2 1.sql
$ python run etl.py -c config.cfg

$ mysql -u <USERNAME> -p -h <HOSTNAME> src 1 < sql/sample/src 1/src 1 2.sql
$ mysql -u <USERNAME> -p -h <HOSTNAME> src 2 < sql/sample/src 2/src 2 2.sql
$ python run etl.py -c config.cfg

........................................

$ mysql -u <USERNAME> -p -h <HOSTNAME> src 1 < sql/sample/src 1/src 1 10.sql
$ mysql -u <USERNAME> -p -h <HOSTNAME> src 2 < sql/sample/src 2/src 2 10.sql
$ python run etl.py -c config.cfg

Now you can generate a chart. I.e.:

$ python generate chart.py -x time -X yearly -y salary -Y median -d company -D 1,2

Command-Line parameters

$ generate chart.py --help
  Usage: generate chart.py [options]
  Script to generate chart using data from target database.
  Examples:
  generate chart.py -x time -X yearly -y salary -Y median -d age -D 26-35,36-45
  generate chart.py -x time -X 2000,2010 -y salary -Y median -d jobrole -D "Project Manager"
  See generate chart.py --help for supported options.

  Options:
    -h, --help                      show this help message and exit
    -c CONFIG, --config=CONFIG      Configuration file
    -v, --verbose                   Show verbose output
    -x X AXIS, --x axis=X AXIS      Label for x-axis: time
    -X X OUTPUT, --x output=X OUTPUT    Unit for x-axis. time: daily, month, yearly or range
                                2000,2003. Range for time is separated by comma to
                                allow daily range such as -X 2000-01-01,2003-08-20
    -y Y AXIS, --y axis=Y AXIS              Label for y-axis: salary
    -Y Y OUTPUT, --y output=Y OUTPUT    Unit for y-axis. salary: median or total
    -d DATA TYPE, --data type=DATA TYPE Data type for plotted lines: age, company, gender,
                                jobrole or manager
    -D DATA OUTPUT, --data output=DATA OUTPUT
                    Unit for plotted lines. age: comma-separated list of
                                ages or ranges, company: comma-separated list of IDs,
                                gender: comma-separated list (male,female, undefined
                                or empty for all), jobrole: comma-separated list of
                                job roles, manager: comma-separated list (true,false)

$ run etl.py --help
  Usage: run etl.py [options]
  Script to perform extract, transform and load flow.
  Example: run etl.py --config=config.cfg --verbose
  See run etl.py --help for supported options.

  Options:
    -h, --help                      show this help message and exit
    -c CONFIG, --config=CONFIG      Configuration file
    -v, --verbose                   Show verbose output

There are no revisions for this page.

User contributions

    This user never has never made submissions.

    User edits

    Syntax for editing wiki

    For you are available next options:

    will make text bold.

    will make text italic.

    will make text underlined.

    will make text striked.

    will allow you to paste code headline into the page.

    will allow you to link into the page.

    will allow you to paste code with syntax highlight into the page. You will need to define used programming language.

    will allow you to paste image into the page.

    is list with bullets.

    is list with numbers.

    will allow your to insert slideshare presentation into the page. You need to copy link to presentation and insert it as parameter in this tag.

    will allow your to insert youtube video into the page. You need to copy link to youtube page with video and insert it as parameter in this tag.

    will allow your to insert code snippets from @worker.

    Syntax for editing wiki

    For you are available next options:

    will make text bold.

    will make text italic.

    will make text underlined.

    will make text striked.

    will allow you to paste code headline into the page.

    will allow you to link into the page.

    will allow you to paste code with syntax highlight into the page. You will need to define used programming language.

    will allow you to paste image into the page.

    is list with bullets.

    is list with numbers.

    will allow your to insert slideshare presentation into the page. You need to copy link to presentation and insert it as parameter in this tag.

    will allow your to insert youtube video into the page. You need to copy link to youtube page with video and insert it as parameter in this tag.

    will allow your to insert code snippets from @worker.