Skip to content

PostGIS integration

Extra

Not all dependencies for the PostGIS integration are not installed by default. Please use the postgis extra during installation.

The package provides an (opinionated) interface to save/load tracks to/from a Postgres database using the PostGIS extension. The functions are located in the geo_track_analyzer.postgis module and operate on SQLAlchemy Engine that is passed into the functions.

API Reference

create_tables(engine, schema, track_table, points_table, extensions=None)

Create the tables for the tracks and points using the passed Engine

Parameters:

Name Type Description Default
engine Engine

SQLLalchemy Engine for the Postgres database

required
schema str

Name of the schema in the database

required
track_table str

Name of the table containing track information

required
points_table str

Name of the table containing the points of all tracks

required
extensions list[tuple[str, str]] | None

Extensions columns (tuple of name and type). If None is passed heartrate, cadence, power, and temperature are created

None
Source code in geo_track_analyzer/postgis/db.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def create_tables(
    engine: Engine,
    schema: str,
    track_table: str,
    points_table: str,
    extensions: list[tuple[str, str]] | None = None,
) -> None:
    """
    Create the tables for the tracks and points using the passed Engine

    :param engine: SQLLalchemy Engine for the Postgres database
    :param schema: Name of the schema in the database
    :param track_table: Name of the table containing track information
    :param points_table: Name of the table containing the points of
        all tracks
    :param extensions: Extensions columns (tuple of name and type). If None
        is passed heartrate, cadence, power, and temperature are created
    """
    track_stmt = f"""
        CREATE TABLE IF NOT EXISTS {schema}.{track_table} (
            id SERIAL PRIMARY KEY,
            track_name TEXT,
            track_date DATE,
            source TEXT
        );
    """

    if extensions is None:
        extensions = [
            ("heartrate", "INTEGER"),
            ("cadence", "INTEGER"),
            ("power", "INTEGER"),
            ("temperature", "DOUBLE PRECISION"),
        ]

    points_stmt = f"""
        CREATE TABLE IF NOT EXISTS {schema}.{points_table} (
        id SERIAL PRIMARY KEY,
        track_id INTEGER REFERENCES {schema}.{track_table}(id),
        segment_id INTEGER NOT NULL,
        geom GEOGRAPHY(POINT, 4326),
        elevation DOUBLE PRECISION DEFAULT NULL,
        time TIMESTAMP WITH TIME ZONE,
        {",".join([f"{n} {t} DEFAULT NULL" for n, t in extensions])},
        -- Add other extension fields as needed
        CONSTRAINT fk_track FOREIGN KEY (track_id) REFERENCES {schema}.{track_table}(id)
    );
    """

    index_stmt = f"""
    CREATE INDEX {points_table}_geom_idx ON {schema}.{points_table} USING GIST (geom);
    """

    with engine.connect() as conn:
        logger.debug("Table %s.%s created", schema, track_table)
        conn.execute(text(track_stmt))
        conn.commit()
        logger.debug("Table %s.%s created", schema, points_table)
        conn.execute(text(points_stmt))
        conn.commit()
        logger.debug("Index created")
        conn.execute(text(index_stmt))
        conn.commit()

insert_track(track, engine, schema, track_table, points_table, source)

Insert a track into the database

Parameters:

Name Type Description Default
track Track

Track to be inserted

required
engine Engine

SQLLalchemy Engine for the Postgres database

required
schema str

Name of the schema in the database

required
track_table str

Name of the table containing track information

required
points_table str

Name of the table containing the points of all tracks

required
source str

Set a source of the track in the track table.

required

Returns:

Type Description
int | None

None if insertion failed and track_id if successfull

Source code in geo_track_analyzer/postgis/db.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def insert_track(
    track: Track,
    engine: Engine,
    schema: str,
    track_table: str,
    points_table: str,
    source: str,
) -> int | None:
    """
    Insert a track into the database

    :param track: Track to be inserted
    :param engine: SQLLalchemy Engine for the Postgres database
    :param schema: Name of the schema in the database
    :param track_table: Name of the table containing track information
    :param points_table: Name of the table containing the points of
        all tracks
    :param source: Set a source of the track in the track table.

    :return: None if insertion failed and track_id if successfull
    """
    start_time = track.track.get_time_bounds().start_time

    with engine.connect() as conn:
        track_stmt = f"""
            INSERT INTO {schema}.{track_table} (track_name, track_date, source)
            VALUES (:name, :date, :source) RETURNING id
        """
        new_track_id = conn.execute(
            text(track_stmt),
            [
                {
                    "name": track.track.name,
                    "date": None if start_time is None else start_time.date(),
                    "source": source,
                }
            ],
        ).fetchone()
        conn.commit()
        logger.info("Track entry created")
        if new_track_id is None:
            logger.error("Track insertion uncessefull")
            return None
        new_track_id = new_track_id[0]
        logger.debug("Inserted track with id %s", new_track_id)
        extensions = _find_extensions(conn, schema, points_table)
        points_stmt = f"""
            INSERT INTO {schema}.{points_table}
            (track_id, segment_id, geom,
             elevation, time, {",".join(extensions)})
            VALUES (:track_id, :segment_id,
                    ST_SetSRID(ST_MakePoint(:long, :lat), 4326)::geography,
                    :ele, :time,{",".join(map(lambda v: f":{v}", extensions))})
        """
        for point_batch in get_track_data(track, new_track_id, 100):
            conn.execute(text(points_stmt), point_batch)
            conn.commit()
        logger.info("All points inserted")
        return new_track_id

load_track(track_id, engine, schema, track_table, points_table, **track_kwargs)

Insert a track into the database

Parameters:

Name Type Description Default
track_id int

id of the track in the database

required
engine Engine

SQLLalchemy Engine for the Postgres database

required
schema str

Name of the schema in the database

required
track_table str

Name of the table containing track information

required
points_table str

Name of the table containing the points of all tracks

required
track_kwargs

Additional keyword arguments passed to the Track.

{}

Returns:

Type Description
Track

Track Object.

Source code in geo_track_analyzer/postgis/db.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def load_track(
    track_id: int,
    engine: Engine,
    schema: str,
    track_table: str,
    points_table: str,
    **track_kwargs,
) -> Track:
    """
    Insert a track into the database

    :param track_id: id of the track in the database
    :param engine: SQLLalchemy Engine for the Postgres database
    :param schema: Name of the schema in the database
    :param track_table: Name of the table containing track information
    :param points_table: Name of the table containing the points of
        all tracks
    :param track_kwargs: Additional keyword arguments passed to the Track.

    :return: Track Object.
    """
    track_stmt = f"SELECT * FROM {schema}.{track_table} WHERE id = {track_id}"
    track = None
    with engine.connect() as conn:
        logger.debug("Loading track: %s from %s.%s", track_id, schema, track_table)
        _track_data = conn.execute(text(track_stmt)).fetchone()
        if _track_data is None:
            raise DBTrackInitializationError(
                "No track with id %s in %s.%s" % (track_id, schema, track_table)
            )
        track_data = _track_data._mapping

        logger.debug("Loading points from %s.%s", schema, points_table)
        extensions = _find_extensions(conn, schema, points_table)
        segments = _load_points(conn, track_id, schema, points_table, extensions)
        for i, segment_id in enumerate(sorted(segments.keys())):
            segment_data = segments[segment_id]
            extension_data = {}
            for extension in extensions:
                extension_data[extension] = segment_data[extension]
            _data = dict(
                points=list(zip(segment_data["latitude"], segment_data["longitude"])),
                elevations=segment_data["elevation"],
                times=segment_data["time"],
                extensions=extension_data,
            )
            if i == 0:
                track = PyTrack(
                    **_data,
                    **track_kwargs,
                )
            else:
                assert track is not None
                track.add_segmeent(**_data)  # type: ignore
    assert track is not None
    track.track.name = track_data["track_name"]
    return track