Source code for distributed_trajectories.OD
"""
Origin-Destination Matrix
=========================
"""
from pyspark.sql.types import ArrayType, FloatType
import pyspark.sql.functions as F
from pyspark.sql import Window
try:
# imports for pytest and documentation
from distributed_trajectories.udfs import d1_state_vector, updates_to_the_transition_matrix
from distributed_trajectories.consts import width, lat_cells, lon_cells, OD_time_frame
except:
# imports for running the package.
from udfs import d1_state_vector, updates_to_the_transition_matrix
from consts import width, lat_cells, lon_cells, OD_time_frame
[docs]class OD:
"""
calculates Origin-Destination matrix
for the given PySpark Data Frame
"""
def __init__(self, df):
"""
:param df: PySpark DataFrame
"""
self.df = df
[docs] def set_d1_state_vector(self):
"""
returns 1D representation for the distributed state
:return:
"""
d1_state_vector_udf = F.udf(d1_state_vector,
ArrayType(ArrayType(FloatType()))
)
window = Window.partitionBy(['id', F.to_date('avg_ts')]).orderBy('ts').rangeBetween(-OD_time_frame, OD_time_frame)
self.df = self.df \
.withColumn('ts', F.col('avg_ts').cast('long')) \
.withColumn('first_ts', F.first('avg_ts').over(window)) \
.withColumn('last_ts', F.last('avg_ts').over(window)) \
.withColumn('first_lat_idx', F.first('lat_idx').over(window)) \
.withColumn('last_lat_idx', F.last('lat_idx').over(window)) \
.withColumn('first_lon_idx', F.first('lon_idx').over(window)) \
.withColumn('last_lon_idx', F.last('lon_idx').over(window)) \
.withColumn('d1_states1',
d1_state_vector_udf(F.col('first_lon_idx'), F.col('first_lat_idx'), F.lit(width), F.lit(lon_cells), F.lit(lat_cells))
) \
.withColumn('d1_states2',
d1_state_vector_udf(F.col('last_lon_idx'), F.col('last_lat_idx'), F.lit(width), F.lit(lon_cells), F.lit(lat_cells))
)\
[docs] def set_OD_updates(self):
"""
for each pair of (origin, destination) create entry for the OD matrix
:return:
"""
updates_to_OD_udf = F.udf(updates_to_the_transition_matrix,
ArrayType(ArrayType(FloatType()))
)
self.df = self.df.withColumn('updates_to_OD', updates_to_OD_udf(F.col('d1_states1'), F.col('d1_states2')))
[docs] def collect_OD_updates(self):
"""
sums up the contributions for different Origin-Destination pairs
:return:
"""
OD = self.df.select('updates_to_OD')\
.withColumn('updates_to_OD', F.explode('updates_to_OD'))\
.withColumn("x", F.col('updates_to_OD').getItem(0))\
.withColumn("y", F.col('updates_to_OD').getItem(1))\
.withColumn("val", F.col('updates_to_OD').getItem(2))\
.drop('updates_to_OD')\
.groupBy(['x', 'y']).agg(F.sum('val').alias('updates_to_OD'))
return OD
[docs] def make_od(self):
"""
includes all the steps for OD
:return:
"""
print('setting 1D state vector')
self.set_d1_state_vector()
print("setting OD updates")
self.set_OD_updates()
print("aggregating OD updates")
od = self.collect_OD_updates()
return od