Source code for tests.test_base

"""
    =============================================================
    Tests for `distributed_trajectories.distributed_trajectories`
    =============================================================
"""


import  pyspark.sql.functions as F
from distributed_trajectories.distributed_trajectories import PrepareDataset
from pyspark.sql import  Window
from pyspark.sql.types import  TimestampType

from distributed_trajectories.consts import spark



[docs]class TestsBase: """ Testing """
[docs] def test_set_helper_column(self, mocker): """ checking how `helper` column is being created. The `helper` column has the following meaning. Its value increases by 1 as soon as an object leaves the 2D cell. :param mocker: :return: false/True """ expected = [1, 2, 2, 2, 3, 1, 2] def mock_read(self): tst = spark.createDataFrame( [(2, '2008-02-04 00:14:04', 116.385, 39.98, 10, 10), (2, '2008-02-04 00:19:04', 116.355, 40.02, 9, 11), (2, '2008-02-04 00:24:04', 116.325, 40.06, 8, 12), (1, '2008-02-04 01:04:04', 116.325, 40.02, 8, 11), (1, '2008-02-04 01:09:04', 116.325, 39.98, 8, 10), (1, '2008-02-04 01:14:04', 116.355, 39.98, 9, 10), (1, '2008-02-04 01:19:04', 116.355, 39.98, 9, 10), (1, '2008-02-04 01:24:04', 116.355, 39.98, 9, 10), (1, '2008-02-04 01:34:04', 116.415, 39.98, 11, 10) ], ['id', 'ts', 'lon_middle', 'lat_middle', 'lon_idx', 'lat_idx'] ) \ .withColumn('ts', F.col('ts').cast(TimestampType())) return tst mocker.patch( 'distributed_trajectories.distributed_trajectories.PrepareDataset.read', mock_read ) actual = PrepareDataset('').set_helper_column().select('helper').collect() actual = [v[0] for v in actual] assert expected == actual
[docs] def test_helper_column_set_avg_time_for_cell(self, mocker): """ the sequence of values in the `helper` column must be a increasing by 1 sequence for each `ID` and each `date`. :param mocker: :return: true/false """ def mock_read(self): tst = spark.createDataFrame( [ (2, '2008-02-04 00:19:04', 116.355, 40.02, 9, 11, 1), (2, '2008-02-04 00:24:04', 116.325, 40.06, 8, 12, 2), (1, '2008-02-04 01:09:04', 116.325, 39.98, 8, 10, 1), (1, '2008-02-04 01:14:04', 116.355, 39.98, 9, 10, 2), (1, '2008-02-04 01:19:04', 116.355, 39.98, 9, 10, 2), (1, '2008-02-04 01:24:04', 116.355, 39.98, 9, 10, 2), (1, '2008-02-04 01:34:04', 116.415, 39.98, 11, 10, 3) ], ['id', 'ts', 'lon_middle', 'lat_middle', 'lon_idx', 'lat_idx', 'helper'] ) \ .withColumn('ts', F.col('ts').cast(TimestampType())) return tst mocker.patch( 'distributed_trajectories.distributed_trajectories.PrepareDataset.read', mock_read ) actual = PrepareDataset('').set_avg_time_for_cell() window = Window.partitionBy([F.col('id'), F.to_date(F.col('avg_ts'))]).orderBy(F.col('avg_ts')) res = actual.withColumn('check_helper', F.lag(F.col('helper')).over(window) - F.col('helper')) \ .select(F.col('check_helper')) \ .filter(F.col('check_helper') != -1) \ .count() assert res == 0