Pathfinding




Pay Notebook Creator: Salah Ahmed0
Set Container: Numerical CPU with TINY memory for 10 minutes0
Total0

Pathfinding w/ obstacles

In [2]:
# CrossCompute
obstacle_table_path = 'obstacles.csv'
source_x = 0
source_y = 0
target_x = 9
target_y = 9
target_folder = '/tmp'
In [3]:
%pylab inline
Populating the interactive namespace from numpy and matplotlib
In [4]:
import itertools
import math
import networkx as nx

from os.path import join
from pandas import read_csv
from shapely import wkt
from shapely.geometry import MultiPolygon, GeometryCollection, Point, LineString

source_xy = int(source_x), int(source_y)
target_xy = int(target_x), int(target_y)
obstacle_table = read_csv(obstacle_table_path)
obstacle_polygons = [wkt.loads(x) for x in obstacle_table['WKT']]
mp = MultiPolygon(obstacle_polygons)
mp
Out[4]:
<shapely.geometry.multipolygon.MultiPolygon at 0x7f5a26c1ada0>
In [5]:
def get_points(pointa, pointb, obstacles):
    ''' returns all nodes for coordinate grid
        params: 
            @pointa: starting point, (int, int)
            @pointb: end point, (int, int)
            @obstacles: group of polygons, shapely.geometry.MultiPolygon
    '''
    bounds = list(itertools.chain(pointa, pointb, obstacles.bounds))
    minimum, maximum = int(min(bounds)), int(max(bounds))
    coords = itertools.product(range(minimum, maximum + 1), range(minimum, maximum + 1))
    points = []
    for i in coords:
        point = Point(*i)
        if all([(
         not obstacle.contains(point)) for obstacle in obstacles.geoms]):
            points.append(point)
    return points
In [8]:
points = get_points(source_xy, target_xy, mp)
In [9]:
G = GeometryCollection(obstacle_polygons + points)
G
Out[9]:
<shapely.geometry.collection.GeometryCollection at 0x7f5a26e78160>
In [10]:
nodes = [(c.coords.xy[0][0], c.coords.xy[1][0]) for c in points]
nodes[:10]
Out[10]:
[(0.0, 0.0),
 (0.0, 1.0),
 (0.0, 2.0),
 (0.0, 3.0),
 (0.0, 4.0),
 (0.0, 5.0),
 (0.0, 6.0),
 (0.0, 7.0),
 (0.0, 8.0),
 (0.0, 9.0)]
In [11]:
graph = nx.Graph()
for n in nodes:
    adj_nodes = [(n[0] + 1, n[1]), (n[0], n[1] + 1), (n[0] - 1, n[1]), (n[0], n[1] - 1)]
    adj_nodes = filter(lambda c: c in nodes, adj_nodes)
    for i in adj_nodes:
        graph.add_edge(n, i, weight=1)
    dnodes = [(n[0] + 1, n[1] + 1),
        (n[0] - 1, n[1] + 1), (n[0] - 1, n[1] - 1), (n[0] + 1, n[1] - 1)]
    d_nodes = filter(
      lambda c: c in nodes and all([
       not obstacle.contains(
           LineString([n, c])
       ) for obstacle in obstacle_polygons
      ]), dnodes)
    for i in d_nodes:
        graph.add_edge(n, i, weight=math.sqrt(2))
In [12]:
nx.draw_networkx(graph, with_labels=False)
from shapely.geometry import LineString
line = LineString([
    (0, 0),
    (5, 0),
    (5, 5),
    (3, 5),
    (3, 9),
    (9, 9),
])
line
In [13]:
# !!! Put a pathfinding algorithm here
path = nx.algorithms.dijkstra_path(graph, source_xy, target_xy)
line = LineString(path)
line
Out[13]:
<shapely.geometry.linestring.LineString at 0x7f5a262b80f0>
In [14]:
GeometryCollection(obstacle_polygons + [
    Point(source_xy),
    Point(target_xy),
    line,
])
Out[14]:
<shapely.geometry.collection.GeometryCollection at 0x7f5a26313c18>
In [14]:
from pandas import DataFrame
waypoint_table = DataFrame(path, columns=['x', 'y'])
waypoint_table
Out[14]:
<style> .dataframe thead tr:only-child th { text-align: right; } .dataframe thead th { text-align: left; } .dataframe tbody tr th { vertical-align: top; } </style>
x y
0 0.0 0.0
1 1.0 0.0
2 2.0 0.0
3 3.0 0.0
4 4.0 1.0
5 5.0 2.0
6 6.0 3.0
7 7.0 4.0
8 8.0 5.0
9 9.0 6.0
10 9.0 7.0
11 9.0 8.0
12 9.0 9.0
In [15]:
waypoint_table_path = join(target_folder, 'waypoints.csv')
waypoint_table.to_csv(waypoint_table_path, index=False)
In [17]:
print('optimal_path_table_path = %s' % waypoint_table_path)
optimal_path_table_path = /tmp/waypoints.csv