Modules

Origin-Destination Matrix

class distributed_trajectories.OD.OD(df)[source]

calculates Origin-Destination matrix for the given PySpark Data Frame

collect_OD_updates()[source]

sums up the contributions for different Origin-Destination pairs :return:

make_od()[source]

includes all the steps for OD :return:

set_OD_updates()[source]

for each pair of (origin, destination) create entry for the OD matrix :return:

set_d1_state_vector()[source]

returns 1D representation for the distributed state :return:


Transition Matrix

class distributed_trajectories.TM.TM(df)[source]

creates Transition Matrix

collect_TM_updates()[source]

sums up the contributions for different Origin-Destination pairs

Returns

PySpark DF

make_tm()[source]

includes all the steps for OD

Returns

PySpark DF, transition matrix

static normalize_tm(tm)[source]

normalizing TM, so the sum over each row is 1

Returns

Transition Matrix

set_TM_updates()[source]

for each pair of (origin, destination) create entry for the OD matrix

Returns

Transition Matrix object

set_d1_state_vector()[source]

returns 1D representation for the distributed state

Returns

Transition Matrix object

set_timestamp_delta()[source]

adds a time delta column, the difference in time between successive observations

Returns

Transition Matrix object


User Defined Functions

distributed_trajectories.udfs.check_central_position(m, n, i, j, width)[source]

checking if the distribution given by the i,j and width will be inside the grid defined by the n and m.

\(i\in[1..m]\), \(j\in[1..n]\), where m – number of cells along latitude and

n is the number of cells along longitude.

Parameters
  • m – number of cells along latitude

  • n – number of cells along longitude

  • i – current position along latitude

  • j – current position along longitude

  • width – width of the distribution of state

Returns

True or False

distributed_trajectories.udfs.d1_coords(d2_vals, m)[source]

transform 2D to 1D coords

distributed_trajectories.udfs.d1_coords_pure(d2_c, m)[source]

transform 2D to 1D coords without values

distributed_trajectories.udfs.d1_state_vector(i, j, width, m, n)[source]

given i and j positions on the lattice and the width of distribution, produces a 1D state vector representation of mxn length.

distributed_trajectories.udfs.d2_coords(i, j, width=1)[source]

given the (i,j) get the coords of the neighbouring cells within width

distributed_trajectories.udfs.filter_OD(origins, destinations)[source]

takes lists of origins and destinations in (1D notation) and returns list of tuples with OD coorinates

distributed_trajectories.udfs.make_mesh(width=1)[source]

create a mesh of given width

distributed_trajectories.udfs.middle_interval_for_x(x, A, B, m)[source]

given the borders [A,B] and the number of intervals m, calculates the new x as the coordinate of the middle of the interval x belongs to.

distributed_trajectories.udfs.put_gauss_on_mesh(X, Y, mu=0, sigma=2)[source]

Given the mesh, produces a Guass-like bell on top of it, and returns a list of unrolled 2D array

distributed_trajectories.udfs.updates_to_the_transition_matrix(state1, state2)[source]

produces an update to the transition matrix due to transition from state1 to state2 Returns list of tuples


Main module

class distributed_trajectories.distributed_trajectories.PrepareDataset(path)[source]

reading, filtering and preparing Dataset

crop_data()[source]

crops dataset to fit into Beijing Box

Returns

filtered self.df

filter_too_sparse_IDs()[source]

keeping just those tracks which have > timestamps_per_hour points per hour on average

Returns

filtered self.df

get_data()[source]

getter

Returns

self.df

get_schema()[source]

DF schema getter :return:

read()[source]

reading Pyspark DF

Returns

PySpark DF

remove_duplicates()[source]

removes duplicates for ID and timestamp

Returns

filtered self.df

remove_too_fast_objects()[source]

some data entries are surely erroneus, so some objects move up to 10-20 km per second. We should remove it.

Returns

filtered self.df

set_avg_time_for_cell()[source]

for each cell we calculate time as the average time for all points in that cell, avg_ts; The ts_1 and ts_2 are the timestamps for the user enter and leave the cell

Returns

self.df

set_helper_column()[source]

the helper column is needed for further aggregations

Returns

self.df

set_middle_interval_for_x()[source]

for each x returns the value for x as the center of the cell, the x is falling into :return:

distributed_trajectories.distributed_trajectories.plot(matrix, fname, title)[source]

plotting sparse matrix

Parameters
  • title – title of the plot

  • fname – output file name

  • matrix – sparse Scipy matrix

Returns

Plot in plots folder

distributed_trajectories.distributed_trajectories.prepare_for_plot(data, type_)[source]

Takes PySpark DF and produces a sparse Scipy matrix to plot

Parameters
  • type – Transition Matrix or Origin-Destination

  • data – Pyspark DF

Returns

Sparse matrix to plot