Source code for transport_frames.frame

""" Module for creating and weighting transport frame """

import warnings

import geopandas as gpd
import momepy
import networkx as nx
import osmnx as ox
import pandas as pd
from shapely import unary_union
from shapely.geometry import Point, Polygon
from shapely.ops import unary_union

from transport_frames.utils.helper_funcs import BaseSchema, _determine_ref_type


warnings.simplefilter("ignore", UserWarning)
import re

import contextily as ctx
import matplotlib.pyplot as plt
import pandera as pa
from pandera.typing import Series


[docs]class PointSchema(BaseSchema): """ Schema for validating points. Attributes ---------- _geom_types : list List of allowed geometry types for the points, default is [shapely.Point] """ _geom_types = [Point]
[docs]class CentersSchema(BaseSchema): """ Schema for validating central cities. Attributes ---------- _geom_types : list List of allowed geometry types for the blocks, default is [shapely.Polygon] """ name: Series[str] = pa.Field(nullable=True) _geom_types = [Point]
[docs]def get_frame(graph: nx.MultiDiGraph, admin_centers: gpd.GeoDataFrame, area_polygon: gpd.GeoDataFrame, region_polygons: gpd.GeoDataFrame, country_polygon: gpd.GeoDataFrame=ox.geocode_to_gdf("RUSSIA"))-> nx.MultiDiGraph: """ Creates frame from graph. Edges are filtered based on reg [1,2]. Region point nodes are assigned with 'city_name' attribute. Points, connecting the roads with other regions or countries are marked with 'exit' and 'exit_country'. Parameters ---------- graph: nx.MultiDiGraph City network graph with classified roads and edges. admin_centers : gpd.GeoDataFrame Administrative region centers points to assign 'city_name' attribute region_polygons : gpd.GeoDataFrame Polygons or regions of the country to assign 'border_region' attribute to exits country_polygon : gpd.GeoDataFrame Polygon of the country to assign 'exit_country' attribute Returns ------- nx.MultiDiGraph Frame of the graph with added 'city_node','exit','exit_country','border_region' attributes """ admin_centers = CentersSchema(admin_centers).to_crs(graph.graph["crs"]).copy() frame = _filter_roads(graph) frame = _assign_city_names_to_nodes(admin_centers, frame) frame = mark_exits(frame, area_polygon, region_polygons, country_polygon) if frame.number_of_nodes() > 0: largest_component_nodes = max(nx.weakly_connected_components(frame), key=len) frame = frame.subgraph(largest_component_nodes).copy() return frame
[docs]def _filter_roads(graph) -> nx.MultiDiGraph: """ Filters the graph to include only reg_1 and reg_2 roads. Parameters ---------- graph : nx.MultiDiGraph graph with edges, containing 'reg' attribute Returns ------- nx.MultiDiGraph Frame of filtered edges with 'reg' in [1, 2] """ edges_to_keep = [(u, v, k) for u, v, k, d in graph.edges(data=True, keys=True) if d.get("reg") in ([1, 2])] frame = graph.edge_subgraph(edges_to_keep).copy() for node, data in frame.nodes(data=True): data["nodeID"] = node return frame
[docs]def _assign_city_names_to_nodes(points, frame, max_distance=3000)-> nx.MultiDiGraph: """ Assigns city names to nodes in the graph based on proximity to city centers. Parameters ---------- points : gpd.GeoDataFrame Region centers points frame : nx.MultiDiGraph Frame of filtered edges with 'reg' in [1, 2] max_distance : int Max distance (meters) on which a node can be assigned to a city Returns ------- nx.MultiDiGraph A frame with nodes assigned with 'city_name' attribute """ points = points.to_crs(frame.graph["crs"]) # Ensure same CRS n, e = momepy.nx_to_gdf(frame) # Find nearest nodes within max_distance projected = gpd.sjoin_nearest(points, n, how="left", distance_col="distance", max_distance=max_distance) assigned_cities = set() # Track assigned cities for row in projected.itertuples(): city_name = row.name node_id = row.nodeID if pd.notna(node_id): for _, d in frame.nodes(data=True): if d.get("nodeID") == node_id: d["city_name"] = city_name assigned_cities.add(city_name) # Mark as assigned # Find cities that were not assigned missed_cities = set(points["name"]) - assigned_cities if missed_cities: print("Some cities weren’t assigned to nodes due to remote location:", missed_cities) return frame
[docs]def mark_exits( frame: nx.MultiDiGraph, area_polygon: gpd.GeoDataFrame, regions_polygons: gpd.GeoDataFrame, country_polygon: gpd.GeoDataFrame ) -> nx.MultiDiGraph: """ Assign the 'exit', 'exit_country' and 'boorder_region' attribute to nodes in a GeoDataFrame based on their intersection with city boundaries. Parameters: ----------- frame : nx.MultiDiGraph Frame graph area_polygon : gpd.GeoDataFrame GeoDataFrame with boundary of the inverstigated region regions_polygons : gpd.GeoDataFrame GeoDataFrame with polygons of the regions with 'name' column country_polygon : gpd.GeoDataFrame GeoDataFrame with boundary of the inverstigated country Returns ------- nx.MultiDiGraph Frame with assigned 'exit', 'exit_country' and 'boorder_region' node attributes """ gdf_nodes = momepy.nx_to_gdf(frame)[0] city_boundary = unary_union(area_polygon.to_crs(gdf_nodes.crs).boundary) regions_polygons.to_crs(gdf_nodes.crs, inplace=True) country_polygon.to_crs(gdf_nodes.crs, inplace=True) gdf_nodes.loc[:, "exit"] = gdf_nodes["geometry"].apply( lambda point: True if city_boundary.intersects(point.buffer(0.1)) else False ) if len(gdf_nodes[gdf_nodes["exit"] == True]) == 0: print("There are no region exits. Try using a larger polygon for downloading from osm") exits = gdf_nodes[gdf_nodes.exit == 1].copy() country_boundary = unary_union(country_polygon.to_crs(exits.crs).boundary) exits.loc[:, "exit_country"] = exits["geometry"].apply( lambda point: True if country_boundary.intersects(point.buffer(0.1)) else False ) gdf_nodes = gdf_nodes.assign(exit_country=exits["exit_country"]) gdf_nodes["exit_country"] = False gdf_nodes.loc[exits.index, "exit_country"] = exits["exit_country"].astype(bool) gdf_nodes["exit_country"] = gdf_nodes["exit_country"].fillna(False) gdf_nodes = add_region_attr(gdf_nodes, regions_polygons, area_polygon) for i, (node, data) in enumerate(frame.nodes(data=True)): data["exit"] = gdf_nodes.iloc[i]["exit"] data["exit_country"] = gdf_nodes.iloc[i]["exit_country"] data["border_region"] = gdf_nodes.iloc[i]["border_region"] return frame
[docs]def add_region_attr(n: gpd.GeoDataFrame, regions: gpd.GeoDataFrame, polygon_buffer: gpd.GeoDataFrame)-> gpd.GeoDataFrame: """ Add a 'border_region' attribute to nodes based on their intersection with region polygons. Parameters ---------- n : gpd.GeoDataFrame Nodes GeoDataFrame with 'exit' and 'exit_country' attributes regions : GeoDataFrame Regions GeoDataFrame with 'name' column polygon_buffer : gpd.GeoDataFrame GeoDataFrame of region of interest boundary Returns ------- gpd.GeoDataFrame Updated nodes GeoDataFrame with 'border_region' attribute """ exits = n[n["exit"] == 1] exits = exits.to_crs(n.crs) exits.loc[:, "buf"] = exits["geometry"].buffer(1000) filtered_regions = _filter_polygons_by_buffer(regions, polygon_buffer, n.crs) joined_gdf = gpd.sjoin( exits.set_geometry("buf"), filtered_regions.to_crs(exits.crs), how="left", predicate="intersects", ) joined_gdf = joined_gdf.drop_duplicates(subset="geometry") exits.loc[:, "border_region"] = joined_gdf["name"] n.loc[:, "border_region"] = exits["border_region"] return n
[docs]def _filter_polygons_by_buffer(gdf_polygons: gpd.GeoDataFrame, polygon_buffer: Polygon, crs)-> gpd.GeoDataFrame: """ Extract and filter region polygons based on a buffer around a given polygon. Parameters ----------- gdf_polygons : gpd.GeoDataFrame Gdf of regions polygons polygon_buffer : Polygon Polygon of the buffer polygon. Returns ------- - GeoDataFrame: Filtered GeoDataFrame of region polygons. """ gdf_polygons = gdf_polygons.to_crs(crs) polygon_buffer = polygon_buffer.to_crs(crs) polygon_buffer = gpd.GeoDataFrame({"geometry": polygon_buffer.buffer(0.1)}, crs=polygon_buffer.crs).to_crs( gdf_polygons.crs ) gdf_polygons = gpd.overlay(gdf_polygons, polygon_buffer, how="difference") buffer_polygon = polygon_buffer.buffer(5000) filtered_gdf = gdf_polygons[gdf_polygons.intersects(buffer_polygon.unary_union)] return filtered_gdf
[docs]def plot_frame(frame: nx.MultiDiGraph, zoom_out_factor: float = 1.1) -> None: """ Plot the transport frame with exit points, city nodes, and categorized roads. The function visualizes the road network using a basemap and styles edges and nodes based on their attributes: - **Edges (Road Segments)**: - `reg == 1` → Red (Higher priority roads) - `reg == 2` → Blue (Lower priority roads) - **Nodes (Intersections & Key Points)**: - `exit == 1` → Green (Exit points) - `exit_country == 1` → Black (Country exit points) - `city_name.notna()` → Red (City center nodes) Parameters ---------- frame : nx.MultiDiGraph The road network graph. zoom_out_factor : float, optional Scaling factor to adjust the map extent by zooming out (default: `1.1`). Returns ------- None Displays a styled transport frame plot with a basemap and legend. """ # Convert graph to GeoDataFrames nodes, edges = momepy.nx_to_gdf(frame) # Convert to Web Mercator (EPSG:3857) nodes = nodes.to_crs(epsg=3857) edges = edges.to_crs(epsg=3857) # Set up the figure fig, ax = plt.subplots(figsize=(10, 10)) # Get bounding box and compute margins for zoom-out effect xmin, ymin, xmax, ymax = edges.total_bounds x_margin = (xmax - xmin) * (zoom_out_factor - 1) / 2 y_margin = (ymax - ymin) * (zoom_out_factor - 1) / 2 # Apply new limits with margins ax.set_xlim(xmin - x_margin, xmax + x_margin) ax.set_ylim(ymin - y_margin, ymax + y_margin) # Add basemap basemap = ctx.providers.OpenStreetMap.Mapnik = ctx.providers.OpenStreetMap.Mapnik ctx.add_basemap(ax, source=basemap, crs=3857, alpha=0.7) # **Plot edges (roads)** edges[edges["reg"] == 1].plot(ax=ax, color="red", linewidth=1.2, alpha=0.8, label="Reg 1") edges[edges["reg"] == 2].plot(ax=ax, color="blue", linewidth=1.2, alpha=0.8, label="Reg 2") # **Plot nodes (intersections, exits, cities)** nodes[nodes["exit"] == 1].plot(ax=ax, color="green", markersize=30, label="Exit Nodes") nodes[nodes["exit_country"] == 1].plot(ax=ax, color="black", markersize=30, label="Country Exit Nodes") nodes[nodes["city_name"].notna()].plot(ax=ax, color="red", markersize=40, label="City Nodes") # Remove axis labels and frame ax.set_xticks([]) ax.set_yticks([]) ax.set_frame_on(False) # Add legend ax.legend(loc="lower right") # Show plot plt.show()
[docs]def weigh_roads(frame: nx.MultiDiGraph, restricted_terr_gdf: gpd.GeoDataFrame = None) -> gpd.GeoDataFrame: """ Assigns and normalizes weights for roads in the network based on their proximity to exits. The function calculates road segment (edge) weights by evaluating their connections to exit points and the type of regions they traverse. If provided, restricted territories influence weight assignment. The function then normalizes weights for further analysis. Parameters ---------- frame : nx.MultiDiGraph The road network graph where nodes represent intersections or exits, and edges represent road segments. restricted_terr_gdf : gpd.GeoDataFrame, optional GeoDataFrame containing restricted territories that may influence road weight calculations (default: `None`). Returns ------- nx.MultiDiGraph The updated road network graph with assigned and normalized road segment weights. """ frame = frame.copy() n, e = momepy.nx_to_gdf(frame) n, e, frame = _mark_ref_type(n, e, frame) if restricted_terr_gdf is not None: country_exits = n[n["exit_country"] == True].copy() # Преобразуем CRS для совместимости restricted_terr_gdf = restricted_terr_gdf.to_crs(country_exits.crs) # Для каждой страны из restricted_terr_gdf применяем логику for _, row in restricted_terr_gdf.iterrows(): border_transformed = row["geometry"] buffer_area = border_transformed.buffer(300) mask = country_exits.geometry.apply(lambda x: x.intersects(buffer_area)) # Применяем метку (mark) к соответствующим участкам n.loc[mask[mask == True].index, "ref_type"] = row["mark"] e["weight"] = 0.0 n["weight"] = 0.0 exits = n[n["exit"] == 1] for i1, start_node in exits.iterrows(): for i2, end_node in exits.iterrows(): if i1 == i2: continue if pd.notna(start_node["border_region"]) and start_node["border_region"] == end_node["border_region"]: continue if start_node.geometry.buffer(15000).intersects(end_node.geometry.buffer(15000)) and ( pd.isna(start_node["exit_country"]) == pd.isna(end_node["exit_country"]) ): continue if start_node["exit_country"] == 1 and end_node["exit_country"] == 1: continue weight = _get_weight(start_node["ref_type"], end_node["ref_type"], end_node["exit_country"]) try: path = nx.astar_path(frame, i1, i2, weight="time_min") except nx.NetworkXNoPath: continue for j in range(len(path) - 1): n.loc[(n["nodeID"] == path[j]), "weight"] += weight e.loc[ (e["node_start"] == path[j]) & (e["node_end"] == path[j + 1]), "weight", ] += weight n.loc[(n["nodeID"] == path[j + 1]), "weight"] += weight n["weight"] = round(n.weight, 3) min_weight = e["weight"].min() max_weight = e["weight"].max() e["norm_weight"] = (e["weight"] - min_weight) / (max_weight - min_weight) for i, (e1, e2, k, data) in enumerate(frame.edges(data=True, keys=True)): data["weight"] = e.iloc[[i]]["weight"][i] data["norm_weight"] = e.iloc[[i]]["norm_weight"][i] for i, (node, data) in enumerate(frame.nodes(data=True)): data["exit"] = n.iloc[i]["exit"] data["exit_country"] = n.iloc[i]["exit_country"] data["weight"] = n.iloc[i]["weight"] data["ref_type"] = n.iloc[i]["ref_type"] return frame
[docs]def _mark_ref_type(n: gpd.GeoDataFrame, e: gpd.GeoDataFrame, frame: nx.MultiDiGraph ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame, nx.MultiDiGraph]: """ Assign reference types to nodes in the road network based on proximity to reference edges. This function identifies nodes marked as exits and assigns reference values and types based on the closest reference edge. The assigned values help categorize roads according to their significance. Parameters ---------- n : gpd.GeoDataFrame GeoDataFrame of road network nodes, including exit nodes to be marked with reference types. e : gpd.GeoDataFrame GeoDataFrame of road network edges containing reference attributes used for classification. frame : nx.MultiDiGraph The road network graph where nodes represent intersections or exits. Returns ------- tuple[gpd.GeoDataFrame, gpd.GeoDataFrame, nx.MultiDiGraph] - Updated GeoDataFrame of nodes with assigned reference values and types. - The original GeoDataFrame of edges. - The updated road network graph with relabeled nodes. """ n["ref"] = None ref_edges = e[e["ref"].notna()] for idx, node in n.iterrows(): if node["exit"] == 1: point = node.geometry distances = ref_edges.geometry.distance(point) if not distances.empty: nearest_edge = ref_edges.loc[distances.idxmin()] ref_value = nearest_edge["ref"] if isinstance(ref_value, list): ref_value = tuple(ref_value) if isinstance(ref_value, str): ref_value = (ref_value,) n.at[idx, "ref"] = ref_value n.at[idx, "ref_type"] = _determine_ref_type(ref_value) n = n.set_index("nodeID") mapping = {node: data["nodeID"] for node, data in frame.nodes(data=True)} n.reset_index(inplace=True) frame = nx.relabel_nodes(frame, mapping) return n, e, frame
[docs]def _determine_ref_type(ref: str) -> float: """ Convert a reference string into a numeric classification. This function categorizes road references based on predefined patterns, assigning a numeric value that represents the road type. Parameters ---------- ref : str A road reference string that follows a standard naming convention. Returns ------- float A numeric code corresponding to the identified reference type. If no match is found, a default classification (2.3) is assigned. """ patterns = { 1.1: r"М-\d+", 1.2: r"Р-\d+", 1.3: r"А-\d+", 2.1: r"..Р-\d+", 2.2: r"..А-\d+", } for value in ref: for ref_type, pattern in patterns.items(): if re.match(pattern, value): return ref_type return 2.3
[docs]def _get_weight(start: float, end: float, exit: bool) -> float: """ Compute the weight for a road segment based on reference types and exit status. This function determines the weight of a road connection by considering the reference types of the start and end nodes, as well as whether the segment represents an exit. Different weight matrices are used for exit and non-exit scenarios. Parameters ---------- start : float The reference type classification of the start node. end : float The reference type classification of the end node. exit : bool Indicates whether the segment represents an exit (True if exit, False otherwise). Returns ------- float The weight value derived from the corresponding weight matrix. """ dict = {1.1: 0, 1.2: 1, 1.3: 2, 2.1: 3, 2.2: 4, 2.3: 5, 0.0: 6, 0.5: 7} if exit == 1: matrix = [ [0.12, 0.12, 0.12, 0.12, 0.12, 0.12, 0.00001, 0.05], # 2.1.1 [0.10, 0.10, 0.10, 0.10, 0.10, 0.10, 0.00001, 0.05], # 2.1.2 [0.08, 0.08, 0.08, 0.08, 0.08, 0.08, 0.00001, 0.05], # 2.1.3 [0.07, 0.07, 0.07, 0.07, 0.07, 0.07, 0.00001, 0.05], # 2.2.1 [0.06, 0.06, 0.06, 0.06, 0.06, 0.06, 0.00001, 0.05], # 2.2.2 [0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.00001, 0.05], # 2.2.3 [0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.00001, 0.05], # 2.2.3 [0.00001, 0.00001, 0.00001, 0.00001, 0.00001, 0.00001, 0.00001, 0.00001], [0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.00001, 0.05], ] else: matrix = [ [0.08, 0.08, 0.08, 0.08, 0.08, 0.08, 0.00001, 0.05], # 2.1.1 [0.07, 0.07, 0.07, 0.07, 0.07, 0.07, 0.00001, 0.05], # 2.1.2 [0.06, 0.06, 0.06, 0.06, 0.06, 0.06, 0.00001, 0.05], # 2.1.3 [0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.00001, 0.05], # 2.2.1 [0.04, 0.04, 0.04, 0.04, 0.04, 0.04, 0.00001, 0.05], # 2.2.2 [0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.00001, 0.05], # 2.2.3 [0.00001, 0.00001, 0.00001, 0.00001, 0.00001, 0.00001, 0.00001, 0.00001], [0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.00001, 0.05], ] return matrix[dict[end]][dict[start]]
[docs]def plot_weighted_roads( weighted_graph: nx.MultiDiGraph, scaling_factor: int = 10, zoom_out_factor: float = 1.1 ) -> None: """ Visualize a weighted road network on a map. This function plots a road network where the thickness of edges corresponds to their assigned weights. The visualization includes a basemap for spatial context. Parameters ---------- weighted_graph : nx.MultiDiGraph The road network graph where edges contain weight attributes. scaling_factor : int, optional Factor to scale line thickness based on edge weight (default: 10). zoom_out_factor : float, optional Factor to slightly expand the map view beyond the network boundaries (default: 1.1). Returns ------- None Displays the road network plot with weighted edges. """ nodes, edges = momepy.nx_to_gdf(weighted_graph) # Filter only weighted edges edges = edges[edges["weight"] > 0] # Normalize weight for line thickness edges["scaled_weight"] = edges["weight"] / edges["weight"].max() * scaling_factor # Convert to Web Mercator (EPSG:3857) for basemap compatibility edges = edges.to_crs(epsg=3857) # Set up the figure fig, ax = plt.subplots(figsize=(10, 10)) xmin, ymin, xmax, ymax = edges.total_bounds x_margin = (xmax - xmin) * (zoom_out_factor - 1) / 2 y_margin = (ymax - ymin) * (zoom_out_factor - 1) / 2 # Apply new limits with margins ax.set_xlim(xmin - x_margin, xmax + x_margin) ax.set_ylim(ymin - y_margin, ymax + y_margin) # Add basemap ctx.add_basemap(ax, source=ctx.providers.OpenStreetMap.Mapnik, crs=3857, alpha=1) # Plot weighted roads edges.plot(ax=ax, linewidth=edges["scaled_weight"], color="blue", alpha=0.8) # Remove axis labels and frame ax.set_xticks([]) ax.set_yticks([]) ax.set_frame_on(False) # Show plot plt.show()