Source code for distributed_trajectories.TM


"""
    Transition Matrix
    =================
"""

import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import ArrayType, FloatType

try:
    # imports for pytest and documentation
    from distributed_trajectories.consts import width, lat_cells, lon_cells, delta_avg_ts
    from distributed_trajectories.udfs import d1_state_vector, updates_to_the_transition_matrix
except:
    # imports for running the package.
    from consts import width, lat_cells, lon_cells, delta_avg_ts
    from udfs import d1_state_vector, updates_to_the_transition_matrix


[docs]class TM: """ creates Transition Matrix """ def __init__(self, df): """ takes PySpark DF to create TM :param df: PySpark dataframe """ self.df = df
[docs] def set_d1_state_vector(self): """ returns 1D representation for the distributed state :return: Transition Matrix object """ d1_state_vector_udf = F.udf(d1_state_vector, ArrayType(ArrayType(FloatType())) ) window = Window.partitionBy([F.col('id'), F.to_date(F.col('avg_ts'))]).orderBy(F.col('avg_ts')) self.df = self.df \ .withColumn('d1_states', d1_state_vector_udf(F.col('lon_idx'), F.col('lat_idx'), F.lit(width), F.lit(lon_cells), F.lit(lat_cells)) ) \ .withColumn('d1_states_lag', F.lag(F.col('d1_states')).over(window)) \ .dropna()
[docs] def set_timestamp_delta(self): """ adds a time delta column, the difference in time between successive observations :return: Transition Matrix object """ window = Window.partitionBy([F.col('id'), F.to_date(F.col('avg_ts'))]).orderBy(F.col('avg_ts')) self.df = self.df.withColumn('delta_avg_ts', F.col('avg_ts').cast('long') - F.lag(F.col('avg_ts')).over(window).cast('long'))
[docs] def set_TM_updates(self): """ for each pair of (origin, destination) create entry for the OD matrix :return: Transition Matrix object """ updates_to_TM_udf = F.udf(updates_to_the_transition_matrix, ArrayType(ArrayType(FloatType())) ) self.df = self.df.withColumn('updates_to_TM', updates_to_TM_udf(F.col('d1_states'), F.col('d1_states_lag')))
[docs] def collect_TM_updates(self): """ sums up the contributions for different Origin-Destination pairs :return: PySpark DF """ TM = self.df.select(['updates_to_TM', 'delta_avg_ts']) \ .withColumn('updates_to_TM', F.explode('updates_to_TM')) \ .withColumn("x", F.col('updates_to_TM').getItem(0)) \ .withColumn("y", F.col('updates_to_TM').getItem(1)) \ .withColumn("val", F.col('updates_to_TM').getItem(2)) \ .filter(F.col('delta_avg_ts') < delta_avg_ts)\ .drop('updates_to_TM') \ .groupBy(['x', 'y']).agg(F.sum('val').alias('updates_to_TM')) return TM
[docs] @staticmethod def normalize_tm(tm): """ normalizing TM, so the sum over each row is 1 :return: Transition Matrix """ window = Window.partitionBy(F.col('x')) # .orderBy(F.col('y')) tm = tm \ .withColumn('updates_to_TM', F.col('updates_to_TM') / F.sum(F.col('updates_to_TM')).over(window)) # .withColumn('number_of_items', F.count(F.col('y')).over(window)) \ # .filter(F.col('number_of_items') > 10) return tm
[docs] def make_tm(self): """ includes all the steps for OD :return: PySpark DF, transition matrix """ print('setting 1D state vector') self.set_d1_state_vector() print("setting timestamp delta") self.set_timestamp_delta() print("setting TM updates") self.set_TM_updates() print("aggregating TM updates") tm = self.collect_TM_updates() tm = self.normalize_tm(tm) return tm