import geopandas as gpd import matplotlib.pyplot as plt import folium from folium.plugins import HeatMap from shapely.geometry import Polygon, LineString, Point import matplotlib as mpl import numpy as np import pandas as pd import datetime import os os.environ["RAY_DEDUP_LOGS"] = "0" import ray import multiprocessing import pickle from tqdm import tqdm from scipy import stats, sparse import plotly.express as px import plotly.graph_objs as go from scipy.spatial.distance import cdist try: from config_mapmatch import * except ModuleNotFoundError: raise Warning('config file for map matchingnot found') def write_log(txt_to_write,txt_file="log.txt", mode="a"): with open(txt_file, mode) as file: file.write(txt_to_write) def generate_color(nb_color): distinct_colors = [] while len(distinct_colors)<nb_color: color = tuple(np.random.randint(0, 256, size=3)) if color not in distinct_colors: distinct_colors.append(color) hex_colors = ['#%02x%02x%02x' % (r, g, b) for [r, g, b] in distinct_colors] return hex_colors def get_accurate_start_end_point(df, streetmap, edgesDf): # Here direction is not important # Because we first projected the points to the edge # Then we analyzed the direction of the trip coarse2full_edge = {i:[] for i in edgesDf.index} full2coarse_edge = dict(streetmap.c_edge) for full_edge in full2coarse_edge: coarse_edge = full2coarse_edge[full_edge] coarse2full_edge[coarse_edge].append(full_edge) coords = np.asarray([[list(row["geometry"].coords[0]), list(row["geometry"].coords[-1])] for index, row in streetmap.iterrows()]) start = coords[:,0, :] end = coords[:,1, :] accu_dist = streetmap["accu_dist"].to_numpy() full_edge_km = streetmap["km"].to_numpy() coarse_edge = df["edge"].to_numpy() travelled_dist = df["km"].to_numpy()*df["frcalong"].to_numpy() edge_index = [] fracs = [] for i in range(len(coarse_edge)): max_dist = -1 for j in coarse2full_edge[coarse_edge[i]]: accu = accu_dist[j] if accu<=travelled_dist[i] and accu>max_dist: max_dist = accu min_edge = j frac_ = max(0, min(1, (travelled_dist[i]-accu)/full_edge_km[j])) edge_index.append(min_edge) fracs.append(frac_) fracs=np.expand_dims(np.asarray(fracs), axis=-1) projected_points = start[edge_index]*(1-fracs) + end[edge_index]*fracs return edge_index, fracs, projected_points[:, 0], projected_points[:, 1] def tracetable(tracesTable): df = read_h5(tracesTable) df = df.sort_values(by=["tripID", "timestamp"]) df["timestamp"] = (df.timestamp-datetime.datetime(2020, 10, 1)).dt.total_seconds() return df def distanceLL(distance): """Geometric log likelihood function for how to penalize edges that are further from the point Similar to Newson and Krummer 2009 This can take a scalar or a numpy array""" # return stats.t(df=20, scale=15).logpdf(distance)+(stats.t(df-20, scale)) # return stats.t(df=20, scale=sigma_z).logpdf(distance) return (stats.t(df=20, scale=sigma_z).logpdf(distance)-stats.t(df=20, scale=sigma_z).logpdf(dist_threshold)+stats.t(df=20, scale=20).logpdf(dist_threshold))*(distance>=dist_threshold)+stats.t(df=20, scale=20).logpdf(distance)*(distance<dist_threshold) # return (stats.t(df=20, scale=sigma_z).logpdf(distance)-stats.t(df=20, scale=sigma_t).logpdf(dist_threshold)+stats.t(df=20, scale=15).logpdf(dist_threshold))*(distance>=dist_threshold)+stats.t(df=20, scale=15).logpdf(distance)*(distance<dist_threshold) def temporalLL(travelcostratio): """Log likelihood function for the transition between different edges Input is ratio of implied speed to speed limit""" return stats.t(df=20, scale=sigma_t).logpdf(travelcostratio) def speedLL(speed): """Log likelihood function for the transition between different edges Input is ratio of implied speed to speed limit""" return stats.norm(loc=6, scale=2).logpdf(speed)*((speed>10).astype(float))#*((speed<3.6).astype(float)+(speed>11).astype(float)) def topologicalLL(distratio): """this is the topological log likelihood function, based on the distance ratio between GPS trace and matched line""" dr = np.maximum(0, np.array(distratio)-1) # distratio can be less than 1 if there is a U-turn, so enforce a minimum return stats.t(df=20, scale=sigma_topol).logpdf(dr)*topol_weight # ensures that the two distributions match at 1 temporalLL_ratio = (stats.expon(scale=temporal_scale).logpdf(1)-stats.norm(scale=sigma_t).logpdf(0)) def geo_dist(v1, v2): # distance by m R = 6371000 v1, v2 = np.radians(v1), np.radians(v2) lon1, lat1 = v1[:, :, 0], v1[:, :, 1] lon2, lat2 = v2[:, :, 0], v2[:, :, 1] dlat = lat2-lat1 dlon = lon2-lon1 a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2)**2 c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a)) return R*c def geo_dist_arr(v1, v2): # distance by m R = 6371000 v1, v2 = np.radians(v1), np.radians(v2) lon1, lat1 = v1[:, 0], v1[:, 1] lon2, lat2 = v2[:, 0], v2[:, 1] dlat = lat2-lat1 dlon = lon2-lon1 a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2)**2 c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a)) return R*c def geo_dist_single(v1, v2): # distance by m R = 6371000 v1, v2 = np.radians(v1), np.radians(v2) lon1, lat1 = v1 lon2, lat2 = v2 dlat = lat2-lat1 dlon = lon2-lon1 a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2)**2 c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a)) return R*c def colorFader(arr, c1="#FFFFFF", c2="#040404"): #fade (linear interpolate) from color c1 (at mix=1) to c2 (mix=0) mix = (arr-min(arr))/(max(arr)-min(arr)) if max(arr)==min(arr): mix = np.ones_like(arr) c1=np.array(mpl.colors.to_rgb(c1)) c2=np.array(mpl.colors.to_rgb(c2)) return [mpl.colors.to_hex(i*c1 + (1-i)*c2) for i in mix] def store_h5(file_path, data, key="my_dataframe"): store = pd.HDFStore(file_path) store[key] = data store.close() def read_h5(file_path, time_converse=True, key="my_dataframe"): store = pd.HDFStore(file_path) read_data = store[key] store.close() if time_converse: read_data["timestamp"] = pd.to_datetime(read_data["timestamp"]) return read_data def get_map_dict(map_, raw_dict_path): puntid_dict_path, idpunt_dict_path, punttype_dict_path, roadtype_dict_path=raw_dict_path+"punt_id.json", raw_dict_path+"id_punt.json", raw_dict_path+"punt_type.json", raw_dict_path+"roadtype.json" if os.path.exists(puntid_dict_path): with open(puntid_dict_path, 'rb') as f: puntid_dict = pickle.load(f) with open(idpunt_dict_path, 'rb') as f: idpunt_dict = pickle.load(f) with open(punttype_dict_path, 'rb') as f: punttype_dict = pickle.load(f) with open(roadtype_dict_path, 'rb') as f: roadtype_dict = pickle.load(f) print("No new road dictionary is written.") return puntid_dict, idpunt_dict, punttype_dict, roadtype_dict puntid_dict, punttype_dict = {}, {} for index, row in tqdm(map_.iterrows()): for j in row.geometry.coords: punt = str(j) if punt not in puntid_dict: puntid_dict[punt] = len(puntid_dict) if puntid_dict[punt] not in punttype_dict: punttype_dict[puntid_dict[punt]] = [row["type"]] elif row["type"] not in punttype_dict[puntid_dict[punt]]: punttype_dict[puntid_dict[punt]].append(row["type"]) idpunt_dict = {puntid_dict[i]:i for i in puntid_dict} roadtype_dict = {value:i for i, value in enumerate(map_["type"].unique())} with open(puntid_dict_path, 'wb') as f: pickle.dump(puntid_dict, f) with open(idpunt_dict_path, 'wb') as f: pickle.dump(idpunt_dict, f) with open(punttype_dict_path, 'wb') as f: pickle.dump(punttype_dict, f) with open(roadtype_dict_path, 'wb') as f: pickle.dump(roadtype_dict, f) print("New road dictionary is written.") return puntid_dict, idpunt_dict, punttype_dict, roadtype_dict def add_intermediate_coords(lst, step_size = 1e-4): start, end = lst # Calculate the number of intermediate points num_points = int(np.ceil(np.linalg.norm(end - start) / step_size)) return [start + i * (end - start) / num_points for i in range(num_points + 1)] def str2lst(input_string): coordinates_str = input_string.strip('()').split(',') coordinates_list = np.array([float(coord.strip()) for coord in coordinates_str]) return coordinates_list class Plot_html(): def __init__(self) -> None: pass def shp_plot_box(self, shapefile, colorby="type"): geo_list, info, color = [], [], [] colors_ = generate_color(shapefile[colorby].nunique()) color_mapping = {value: index for index, value in enumerate(shapefile[colorby].unique())} for index, row in shapefile.iterrows(): for j in row.geometry.coords: geo_list += [list(j)] geo_list += [[None, None]] color = color + [colors_[color_mapping[row[colorby]]] for i in range(len(row.geometry.coords))] + ["#000000"] if "osm_id" not in shapefile.columns: info = info + ['idx: '+str(index) for i in range(len(row.geometry.coords))] + [' '] else: info = info + ['osm_id: '+str(row["osm_id"])+"<br>Roadtype: "+str(row[colorby]) for i in range(len(row.geometry.coords))] + [' '] geo_list = np.asarray(geo_list) lons, lats, color, info = geo_list[:,0], geo_list[:,1], color, info return lons, lats, color, info def shp_plot_selective(self, shapefile, outputfile=None, colorby="type"): colors_ = generate_color(shapefile[colorby].nunique()) color_mapping = {value: index for index, value in enumerate(shapefile[colorby].unique())} plot_dict = {} for type_ in shapefile[colorby].unique(): geo_list, info, color = [], [], [] df=shapefile[shapefile[colorby]==type_] for index, row in df.iterrows(): for j in row.geometry.coords: geo_list += [list(j)] geo_list += [[None, None]] color = color + [colors_[color_mapping[row[colorby]]] for i in range(len(row.geometry.coords))] + ["#000000"] info = info + ['Full edge: '+str(row.index)+"<br>Roadtype: "+str(row[colorby]) for i in range(len(row.geometry.coords))] + [' '] geo_list = np.asarray(geo_list) lons, lats, color, info = geo_list[:,0], geo_list[:,1], color, info plot_dict[type_] = [lons, lats, color, info] if outputfile: self.plot_trace(multipleRoutes=plot_dict, outputpath=outputfile) return plot_dict def poi_plot_box(self, shapefile): geo_list, info = [], [] for index, row in shapefile.iterrows(): for j in row.geometry.coords: geo_list += [list(j)] info = info + ['osm_id: '+str(row["osm_id"])+"<br>Type: "+row["type"]] geo_list = np.asarray(geo_list) lons, lats, info = geo_list[:,0], geo_list[:,1], info return lons, lats, info def plot_map_objs(self, outputpath, line_box=False, marker_box=False, line_width=10, line_color='#6785C0'): # if os.path.exists(outputpath): # return line_marker_size, marker_size = 20, 10 zoom_center = {"lat": 51.925818, "lon":4.464207} fig = go.Figure() if line_box: lons, lats, color, info = line_box if color == None: color = line_color line_trace = go.Scattermapbox( mode = "markers+lines+text", lon = lons, lat = lats, marker = {'size': line_marker_size, 'color': color},line={'width':line_width, 'color':line_color}, name = "Line", text=info) fig.add_trace(line_trace) if marker_box: lons, lats, info = marker_box marker_trace = go.Scattermapbox( mode = "markers+text", lon = lons, lat = lats, marker = {'size': marker_size, 'color': "#FF0000"}, name = "Marker", text=info) fig.add_trace(marker_trace) fig.update_layout(mapbox_style="open-street-map") fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0}, mapbox = { 'center': zoom_center, 'zoom': 12}) fig.update_layout(legend={"orientation":"h"}) fig.update_layout(height=1000, width=2500) fig.write_html(outputpath) def plot_trace(self, outputpath, line_marker_size=20, marker_size=10, background=None, traces=None, routes=None, multipleTraces=None, multipleRoutes=None, multipleProjection=None): # if os.path.exists(outputpath): # return zoom_center = {"lat": 51.9248025, "lon":4.5} fig = go.Figure() if background: lons, lats, color, info = background line_trace0 = go.Scattermapbox( mode = "markers+lines+text", lon = lons, lat = lats, marker = {'size': line_marker_size, 'color': '#CCFFFF'},line={'width':10, 'color':'#CCFFFF'}, name = "Background", text=info) fig.add_trace(line_trace0) if traces: lons, lats, color, info = traces line_trace1 = go.Scattermapbox( mode = "markers+lines+text", lon = lons, lat = lats, marker = {'size': line_marker_size, 'color': color},line={'width':5, 'color':'#FFCCE5'}, name = "Trace", text=info) fig.add_trace(line_trace1) if routes: lons, lats, color, info = routes if color is None: color, edge_color="#5CA961", "#5CA961" else: edge_color="#CCFFFF" line_trace2 = go.Scattermapbox( mode = "markers+lines+text", lon = lons, lat = lats, marker = {'size': line_marker_size, 'color': color},line={'width':10, 'color':edge_color}, name = "Selected_edge", text=info) fig.add_trace(line_trace2) if multipleTraces: for tripID in multipleTraces.keys(): lons, lats, color, info, = multipleTraces[tripID] line_trace = go.Scattermapbox( mode = "markers+lines+text", lon = lons, lat = lats, marker = {'size': line_marker_size, 'color': color},line={'width':5, 'color':'#FFCCE5'}, name = str(tripID), text=info) fig.add_trace(line_trace) if multipleRoutes: for tripID in multipleRoutes.keys(): lons, lats, color, info, = multipleRoutes[tripID] line_trace = go.Scattermapbox( mode = "markers+lines+text", lon = lons, lat = lats, marker = {'size': line_marker_size, 'color': color},line={'width':5, 'color':'#5CA961'}, name = str(tripID), text=info) fig.add_trace(line_trace) if multipleProjection: for tripID in multipleProjection.keys(): lons, lats = multipleProjection[tripID] point_trace = go.Scattermapbox( mode = "markers", lon = lons, lat = lats, marker = {'size': marker_size, 'color': "#FF0000"}, name = str(tripID)) fig.add_trace(point_trace) fig.update_layout(mapbox_style="open-street-map") fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0}, mapbox = { 'center': zoom_center, 'zoom': 13}) fig.update_layout(legend={"orientation":"h"}) fig.update_layout(height=800, width=2500) fig.for_each_trace(lambda trace: trace.update(visible="legendonly")) fig.write_html(outputpath) def plot_point(df: pd.DataFrame, outputpath, col_color=None, color_scale=None): # if os.path.exists(outputpath): # return df = df.copy() df["size"] = 0.3 myzoom = 14 # minmaxcolor = [0,30] mycenter = {"lat": df["lat"].unique()[0], "lon": df["lon"].unique()[0]} fig = px.scatter_mapbox(df, lat="lat", lon="lon", color=col_color, color_continuous_scale=color_scale, size="size", size_max=13, # range_color=minmaxcolor, zoom=myzoom, center=mycenter) fig.update_layout(mapbox_style="open-street-map") fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0}) fig.update_layout(legend={"orientation":"h"}) fig.update_layout(height=1000, width=2500) fig.write_html(outputpath) def heatmap_plot(self, data:pd.DataFrame, outputpath, min_lat=51.899736, max_lat=51.952686, min_lon=4.430277, max_lon=4.504152): if os.path.exists(outputpath): return map_obj = folium.Map(location = [(min_lat+max_lat)/2, (min_lon+max_lon)/2], zoom_start = 14) lats_longs_weight = list(map(list, zip(data.lat, data.lon, [1 for j in range(len(data))]))) HeatMap(lats_longs_weight).add_to(map_obj) map_obj.save(outputpath) class Plot_plt(): def __init__(self) -> None: pass def count_time_interval(self, data:pd.DataFrame): df = data.sort_values(by=['tripID', 'timestamp']) df['time_difference'] = df.groupby('tripID')['timestamp'].diff() arr = df["time_difference"].dt.total_seconds().to_numpy()/60 arr[arr>10]=10 return arr def hist_density_plot(self, data:np.ndarray, x_label, y_label, title, bin=100, outputpath=None): fig, ax = plt.subplots(1, 1) ax.hist(data, bins=bin, density=True) ax.set_ylabel(y_label) ax.set_xlabel(x_label) ax.set_title(title) if outputpath: # and not os.path.exists(outputpath): plt.savefig(outputpath, dpi=800) def bar_plot(self, x_values, y_values, x_label, y_label, title, outputpath=None, show_tick=True): fig, ax = plt.subplots(1, 1) ax.bar(x_values, y_values) ax.set_xlabel(x_label) ax.set_ylabel(y_label) ax.set_title(title) if show_tick: for x, y in zip(x_values, y_values): ax.text(x, y, str(y), ha='center', va='bottom') ax.set_xticks(x_values) ax.tick_params(axis='x', rotation=45, labelsize=8) if outputpath and not os.path.exists(outputpath): plt.savefig(outputpath, dpi=800) def plot_given_area(self, left_top=[51.926018, 4.482589], right_bottom=[51.924060, 4.488257], streetmap="mapMatch_result/full_roads.shp", outputpath=None): ## defi1: 51°55'49.1"N 4°29'05.6"E 51°55'46.5"N 4°29'15.3"E ## case1: left_top=[51.926026, 4.480273], right_bottom=[51.924508,4.486673] up_lat, left_lon=left_top down_lat, right_lon=right_bottom rotterdam_map = gpd.read_file(streetmap) forbidden_type = ["primary", "primary_link"] suspicious_type = ["secondary", "secondary_link", "tertiary"] puntid_dict, idpunt_dict, punttype_dict, roadtype_dict = get_map_dict(None, "graph/raw/") F, S, B = [], [], [] polygon = Polygon([(left_lon,up_lat), (left_lon,down_lat), (right_lon,down_lat), (right_lon,up_lat)]) for index,row in rotterdam_map.iterrows(): # print(LineString([i for i in row.geometry.coords])) line = LineString([i for i in row.geometry.coords]) if line.within(polygon): if row["type"] in forbidden_type: F.append(line) F+=[Point(i) for i in row.geometry.coords] elif row["type"] in suspicious_type: S.append(line) S+=[Point(i) for i in row.geometry.coords] else: B.append(line) B+=[Point(i) for i in row.geometry.coords] Important=[Point(str2lst(idpunt_dict[3116])), Point(str2lst(idpunt_dict[3498]))] I_df = gpd.GeoDataFrame(geometry=Important) I_df.crs = 'EPSG:4326' S_df = gpd.GeoDataFrame(geometry=S) S_df.crs = 'EPSG:4326' B_df = gpd.GeoDataFrame(geometry=B) B_df.crs = 'EPSG:4326' F_df = gpd.GeoDataFrame(geometry=F) F_df.crs = 'EPSG:4326' fig, ax = plt.subplots(figsize=(15, 11)) B_df.plot(ax=ax, alpha=0.4, color='green') # all_df.plot(ax=ax, alpha=0.4, color='grey') if len(F): F_df.plot(ax=ax, alpha=0.4, color='red') if len(S): S_df.plot(ax=ax, alpha=0.4, color='blue') I_df.plot(ax=ax, alpha=0.4, color='red', markersize=200, marker='*') if len(F): ax.legend(["Bicycle Roads", "Forbidden Roads", "Suspicious Roads"], loc='lower center', ncols=3) else: ax.legend(["Bicycle Roads", "Bicycle Road Node","Suspicious Roads","Suspicious Road Nodes","Considered road"],bbox_to_anchor=(0.5, -0.2), loc='lower center', ncols=5) plt.xlabel("Longitude") plt.ylabel("Latitude") locs, _= plt.xticks() # plt.xticks([locs[0],locs[-1]],["4°29'20.9\"E", "4°29'39.9\"E"]) # locs, _=plt.yticks() # plt.yticks([locs[0],locs[-1]],["51°55'34.8\"N", "51°55'32.0\"N" ]) # plt.show() plt.savefig('/Users/tinggao/Desktop/cplcate3.png', dpi=600) return