Посібник: Створення потоку даних паркування з використанням Stad Gent Open Data API, PostgreSQL та Metabase

Слідуйте за мною на LinkedIn, якщо вам сподобалося те, що ви тут прочитали.

У цьому посібнику я розповім, як я використовував API відкритих даних Stad Gent для отримання даних про паркінги, збереження їх у PostgreSQL базі даних та візуалізації за допомогою Metabase.

Ось результат: parkinggent.xudo.be.

"Система спроектована для роботи на серверах Hetzner Cloud з Ubuntu і включає автоматизоване збирання та очищення даних для ефективного управління зберіганням у базі даних. Я хотів вивчити інструменти, з якими я не часто зустрічаюся зі своїми поточними клієнтами, які здебільшого використовують QlikView і Qlik Sense. Ось чому я створив цей маленький хобі-проєкт — щоб підтримувати форму і йти в ногу з швидко розвиваючим світом інструментів для роботи з даними.

Мене звати Вутер Трапперс, і я фрилансер з бізнес-аналітики в компанії Xudo. Створення каналів для обробки даних і панелей управління — моя основна робота, але навіть так я зіштовхнувся з труднощами на деяких етапах цього процесу. Пам'ятайте, що ці кроки є результатом проб і помилок, і не все проходить так гладко, як може здатися в посібнику. Якщо щось не працює одразу, не зневіряйтесь — продовжуйте експериментувати, поки не вийде!"

Огляд системи

  1. Збір даних: Скрипти на Python отримують дані про паркінги з Stad Gent API кожні 5 хвилин і зберігають їх у базі даних PostgreSQL.
  2. Зберігання даних: Створюються таблиці PostgreSQL для збереження сирих і агрегованих даних.
  3. Агрегування даних: Скрипт агрегує дані щогодини та видаляє детальні дані старші за 2 тижні.
  4. Візуалізація: Metabase встановлений на окремому сервері, налаштований для роботи через HTTPS, і використовується для створення панелей управління для реальних і історичних даних про паркінг.

Крок 1: Налаштування серверів Hetzner Cloud

Створіть два сервери в консолі Hetzner Cloud.

  • Сервер 1: Для PostgreSQL та Python-скриптів.
  • Сервер 2: Для Metabase.
  • Використовуйте Ubuntu 22.04 LTS для обох серверів.

Я використовую найменші сервери типу CX22. Вони мають 2 процесори, 4 ГБ ОЗП і 40 ГБ SSD за 3,29€ на місяць. Я також регулярно створюю знімки своїх серверів у Hetzner, щоб мати актуальну версію для відновлення, якщо щось піде не так — а це сталося під час спроби встановити HTTPS на сервері Metabase.

pic

Консоль Hetzner

Чому два сервери, а не один?

Використання двох серверів замість одного — це дизайнерське рішення, яке приносить кілька переваг, зокрема в плані масштабованості, безпеки та продуктивності. Для стислості, залишу це так, але якщо ви хочете детальніше дізнатися про причини використання двох серверів — замість одного, що технічно можливо — ви можете прочитати більше на сайті Xudo.

Крок 2: Встановлення та налаштування PostgreSQL

  1. Встановіть PostgreSQL на сервер Ubuntu.

Підключіться до сервера через SSH за допомогою вашого користувача та пароля і виконайте наступні команди:

sudo apt update   
sudo apt install postgresql

2. Створіть базу даних і користувача:

sudo -u postgres psql   
CREATE DATABASE parking_data;   
CREATE USER parking_user WITH PASSWORD 'your_password';   
GRANT ALL PRIVILEGES ON DATABASE parking_data TO parking_user;

**3.
Встановлення pgAdmin4

Ви можете створювати таблиці через командний рядок, але ми встановили pgAdmin4 для зручнішого управління нашими базами даних:

sudo curl -fsSL https://www.pgadmin.org/static/packages_pgadmin_org.pub | sudo gpg --dearmor -o /usr/share/keyrings/pgadmin-keyring.gpg  
sudo sh -c 'echo "deb [signed-by=/usr/share/keyrings/pgadmin-keyring.gpg] https://ftp.postgresql.org/pub/pgadmin/pgadmin4/apt/$(lsb_release -cs) pgadmin4 main" > /etc/apt/sources.list.d/pgadmin4.list'  

sudo apt update  

sudo apt install pgadmin4-web  
sudo apt install apache2  

sudo a2enconf pgadmin4

4. Створення таблиць

Створіть наступні таблиці за допомогою pgAdmin4.

pic

Таблиця сирих даних: parking_data

CREATE TABLE IF NOT EXISTS parking_data.parking_data (  
 id text COLLATE pg_catalog."default",  
 name text COLLATE pg_catalog."default" NOT NULL,  
 last_update timestamp with time zone NOT NULL,  
 total_capacity integer NOT NULL,  
 available_capacity integer NOT NULL,  
 occupation integer NOT NULL,  
 type text COLLATE pg_catalog."default" NOT NULL,  
 description text COLLATE pg_catalog."default",  
 external_id text COLLATE pg_catalog."default",  
 opening_times_description text COLLATE pg_catalog."default",  
 is_open_now integer NOT NULL,  
 temporarily_closed integer NOT NULL,  
 operator_information text COLLATE pg_catalog."default",  
 free_parking integer NOT NULL,  
 url_link_address text COLLATE pg_catalog."default",  
 occupancy_trend text COLLATE pg_catalog."default",  
 specific_access_information text COLLATE pg_catalog."default",  
 level text COLLATE pg_catalog."default",  
 road_number text COLLATE pg_catalog."default",  
 road_name text COLLATE pg_catalog."default",  
 latitude double precision NOT NULL,  
 longitude double precision NOT NULL,  
 text text COLLATE pg_catalog."default",  
 category text COLLATE pg_catalog."default",  
 dashboard text COLLATE pg_catalog."default" NOT NULL,  
 last_modified timestamp with time zone,  
 CONSTRAINT parkinggent_external_id_key UNIQUE (external_id) )

Таблиця агрегованих даних: parking_hourly

CREATE TABLE IF NOT EXISTS parking_data.parking_hourly (  
 name text COLLATE pg_catalog."default" NOT NULL,  
 is_open_now integer,  
 temporarily_closed integer,  
 last_modified timestamp without time zone,  
 average_total_capacity double precision,  
 average_available_capacity double precision,  
 capacity_trend integer,  
 last_update timestamp with time zone )

Крок 3: Написання Python-скриптів

1. Отримання даних з Stad Gent API

Встановіть необхідні бібліотеки Python на сервері бази даних

pip install requests psycopg2-binary

2. Python скрипт для отримання та збереження даних:

Я додав розширене логування до скрипту, щоб виявити деякі поля, які не були коректно отримані. Я вирішив помістити поля, які мені не вдалося налаштувати, в коментарі, оскільки вони мені не потрібні для контексту панелі, яку я хотів побудувати.

Дані в API оновлюються кожні 5 хвилин.
Щоб отримувати останні оновлення, я використовую cron-job для запуску цього скрипта кожні 5 хвилин.

import requests  
import psycopg2  
import logging  
from psycopg2 import sql  

# Налаштування логування  
logging.basicConfig(  
 filename="parking_data_insertion.log",  
 level=logging.DEBUG,  
 format="%(asctime)s - %(levelname)s - %(message)s"  
)  

# URL API  
API_URL = ""# Параметри підключення до бази даних  
DB_CONFIG = {  
 "dbname": "parking_data",  
 "user": "parking_user",  
 "password": "your_password",  
 "host": "127.0.0.1",  
 "port": 5432  
}  

def fetch_and_insert_data():  
 try:  
 logging.info("Fetching data from API.")  
 response = requests.get(API_URL)  
 response.raise_for_status()  
 data = response.json()  

 # Логування сирого відповіді для налагодження  
 # logging.debug(f"Raw API response: {data}")  

 # Перевірка наявності 'results' та логування його вмісту  
 results = data.get("results", [])  
 if not results:  
 logging.warning("No results found in the API response.")  
 else:  
 logging.info(f"Fetched {len(results)} records from API.")  

 # Вставка записів у базу даних  
 insert_data_to_db(results)  
 except requests.RequestException as e:  
 logging.error(f"Error fetching data from API: {e}")  
 except Exception as e:  
 logging.error(f"Unexpected error while fetching data: {e}")  

def insert_data_to_db(records):  
 logging.info("Starting the database insertion process.")  
 connection = None  

 try:  
 connection = psycopg2.connect(**DB_CONFIG)  
 cursor = connection.cursor()  
 logging.info("Successfully connected to the database.")  

 insert_query = """  
 INSERT INTO parking_data.parking_data(  
 id, name, last_update, total_capacity, available_capacity, occupation, type,  
 description, opening_times_description, is_open_now, temporarily_closed, url_link_address,  
 operator_information, free_parking, occupancy_trend,latitude, longitude,  
 category, dashboard  
 ) VALUES (  
 %(id)s, %(name)s, %(lastupdate)s, %(totalcapacity)s, %(availablecapacity)s, %(occupation)s, %(type)s,  
 %(description)s, %(openingtimesdescription)s, %(isopennow)s, %(temporaryclosed)s,%(urllinkaddress)s,  
 %(operatorinformation)s, %(freeparking)s, %(occupancytrend)s,%(latitude)s, %(longitude)s,  
 %(categorie)s, %(dashboard)s  
 )  
 """  
 logging.debug("SQL INSERT query prepared.")  

 for i, record in enumerate(records, start=1):  
 # Витягування даних безпосередньо з словника record  
 name = record.get("name")  
 lastupdate = record.get("lastupdate")  
 totalcapacity = record.get("totalcapacity")  
 availablecapacity = record.get("availablecapacity")  
 occupation = record.get("occupation")  
 type = record.get("type")  
 description = record.get("description")  
 id = record.get("id")  
 openingtimesdescription = record.get("openingtimesdescription")  
 isopennow = record.get("isopennow")  
 temporaryclosed = record.get("temporaryclosed")  
 operatorinformation = record.get("operatorinformation")  
 freeparking = record.get("freeparking")  
 urllinkaddress = record.get("urllinkaddress")  
 occupancytrend = record.get("occupancytrend")  
 # Витягування вкладених даних (якщо є)  
 # specific_access_info = record.get("locationanddimension", {}).get("specificAccessInformation", [""])[0]  
 # level = record.get("level")  
 # roadnumber = record.get("roadnumber")  
 # roadname = record.get("roadname")  
 # Витягування вкладених даних (якщо є)  
 latitude = record.get("location", {}).get("lat")  
 longitude = record.get("location", {}).get("lon")  
 # text = record.get("text")  
 categorie = record.get("categorie")  
 dashboard = record.get("dashboard")  
 # Перевірка на відсутність важливих даних  
 if not name or not totalcapacity or not latitude or not longitude:  
 logging.warning(f"Record {i} skipped: Missing essential data.")  
 continue  


 data = {  
 "name": name,  
 "lastupdate": lastupdate,
"totalcapacity": totalcapacity,  
 "availablecapacity": availablecapacity,  
 "occupation": occupation,  
 "type" : type,  
 "description" : description,  
 "id" : id,  
 "openingtimesdescription": openingtimesdescription,  
 "isopennow" : isopennow,  
 "temporaryclosed": temporaryclosed,  
 "operatorinformation" : operatorinformation,  
 "freeparking" : freeparking,  
 "urllinkaddress" : urllinkaddress,  
 "occupancytrend" : occupancytrend,  
 # "specificAccessInformation": specific_access_info,   
 # "level" : level,  
 # "roadnumber" :roadnumber,  
 # "roadname" : roadname,  
 "latitude" : latitude,  
 "longitude" : longitude,  
 # "text" : text,  
 "categorie" : categorie,  
 "dashboard" : dashboard  
 }  

 # Перевірка на відсутні ключі перед вставкою  
 missing_keys = [key for key in data if data[key] is None]  
 if missing_keys:  
 logging.warning(f"Запис {i} пропущено через відсутні ключі: {', '.join(missing_keys)}")  
 continue  
 try:  
 cursor.execute(insert_query, data)  
 logging.info(f"Запис {i} успішно вставлений.")  
 except Exception as e:  
 logging.error(f"Помилка при вставці запису {i}: {e}")  
 logging.error(f"Дані для запису {i}: {data}") # Логування даних для перевірки  
 continue  
 connection.commit()  
 logging.info("Всі записи успішно збережено в базі даних.")  

 except psycopg2.Error as e:  
 logging.error(f"Помилка бази даних: {e}")  
 except Exception as e:  
 logging.error(f"Неочікувана помилка: {e}")  
 finally:  
 if connection:  
 cursor.close()  
 connection.close()  
 logging.info("З'єднання з базою даних закрито.")  

# Виклик функції для отримання даних і вставки в базу даних  
fetch_and_insert_data()

Можливості для вдосконалення

Одним із удосконалень може бути з'ясування, чому деякі поля не завантажуються правильно.
Ще одна річ, яка стосується того, як я планую запуск цього завдання. Пишучи цей посібник, я зрозумів, що також міг би використати бібліотеку Python Schedule і запускати скрипт як службу. Це безумовно те, що я розгляну наступного разу, коли буду створювати подібний хобі-проєкт.

3. Скрипт для агрегації

Щоб мати можливість аналізувати тренди в минулому, не зберігаючи всі дані, що створюються кожні п'ять хвилин, я агрегував дані по годинах. Я також додав тренд: чи приїжджають люди, чи від'їжджають?

import psycopg2  
import logging  

def insert_parking_data():  
 """  
 Встановлює з'єднання з базою даних і вставляє обчислені дані про паркінг.  
 """  
 # Налаштування логування  
 logging.basicConfig(filename='parking_data_hourly_aggregation.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')  
 # Параметри підключення до бази даних  
 DB_CONFIG = {  
 "dbname": "parking_data",  
 "user": "parking_user",  
 "password": "your_password",  
 "host": "127.0.0.1",  
 "port": 5432  
 }  
 sql = """  
 WITH sq_first AS (  
 SELECT DISTINCT ON (name)  
 name,  
 FIRST_VALUE(available_capacity) OVER (PARTITION BY name ORDER BY ABS(EXTRACT(EPOCH FROM last_modified - (NOW() - INTERVAL '1 hour'))) ASC) AS first_available_capacity  
 FROM  
 parking_data.parking_data  
 WHERE  
 last_modified <= NOW() - INTERVAL '1 hour'  
 ),  
 sq_last AS (  
 SELECT DISTINCT ON (name)  
 name,  
 FIRST_VALUE(available_capacity) OVER (PARTITION BY name ORDER BY last_modified DESC) AS last_available_capacity  
 FROM  
 parking_data.parking_data  
 WHERE  
 last_modified >= NOW() - INTERVAL '1 hour'  
 )  
 INSERT INTO parking_data.parking_hourly_aggregation (name, is_open_now, temporarily_closed, last_update, last_modified, average_total_capacity, average_available_capacity, capacity_trend)  
 SELECT   
 parking_data.name,  
 parking_data.is_open_now,  
 parking_data.temporarily_closed,  
 MAX(parking_data.last_update) AT TIME ZONE 'Europe/Brussels' AS last_update,  
 MAX(parking_data.last_modified) AT TIME ZONE 'Europe/Brussels' AS last_modified,  
 ROUND(AVG(parking_data.total_capacity), 0) AS average_total_capacity,  
 ROUND(AVG(parking_data.available_capacity), 0) AS average_available_capacity,  
 sq_last.last_available_capacity - sq_first.first_available_capacity AS capacity_trend  
 FROM  
 parking_data.parking_data  
 LEFT JOIN sq_first ON parking_data.name = sq_first.name  
 LEFT JOIN sq_last ON parking_data.name = sq_last.name  
 WHERE  
 parking_data.last_modified >= NOW() - INTERVAL '1 hour' -- Фільтрування даних за останню годину  
 GROUP BY  
 parking_data.name,  
 parking_data.is_open_now,  
 parking_data.temporarily_closed,  
 sq_first.first_available_capacity,  
 sq_last.last_available_capacity;  
 """  
 connection = None # Оголошуємо з'єднання поза блоком try-except  
 try:  
 connection = psycopg2.connect(**DB_CONFIG)  
 cursor = connection.cursor()  
 logging.info("Успішно підключено до бази даних.")  

 # Виконання SQL-запиту INSERT  
 cursor.execute(sql)  
 connection.commit() # Підтвердження транзакції  

 logging.info("Дані успішно вставлено.")  
 except (Exception, psycopg2.Error) as error:  
 logging.error("Помилка при вставці даних", exc_info=error)  
 finally:  
 if connection:  
 cursor.close()  
 connection.close()  
 logging.info("З'єднання з базою даних закрито.")  
if __name__ == "__main__":  
 insert_parking_data()
  1. Скрипт для видалення

Щоб переконатися, що сервер не заповнюється занадто швидко, я зберігаю повні дані лише за останні 48 годин.

import psycopg2  
import logging  

def delete_old_parking_data():  
 """  
 Встановлює з'єднання з базою даних і видаляє записи старші за 48 годин.
"""  
 # Налаштування логування  
 logging.basicConfig(filename='parking_data_delete_old_data.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')  
 # Параметри підключення до бази даних  
 DB_CONFIG = {  
 "dbname": "ParkingGent",  
 "user": "postgres",  
 "password": "rypkGWyh1r",  
 "host": "127.0.0.1",  
 "port": 5432  
 }  
 delete_sql = """  
 DELETE FROM parkinggent.parkinggent  
 WHERE last_modified < NOW() - INTERVAL '48 hours';  
 """  
 connection = None # Оголошуємо з'єднання поза блоком try-except  
 try:  
 connection = psycopg2.connect(**DB_CONFIG)  
 cursor = connection.cursor()  
 logging.info("Успішно підключено до бази даних.")  

 # Виконання SQL-запиту DELETE  
 cursor.execute(delete_sql)  
 connection.commit() # Підтвердження транзакції  

 logging.info("Старі дані успішно видалено.")  
 except (Exception, psycopg2.Error) as error:  
 logging.error("Помилка при видаленні даних", exc_info=error)  
 finally:  
 if connection:  
 cursor.close()  
 connection.close()  
 logging.info("З'єднання з базою даних закрито.")  
if __name__ == "__main__":  
 delete_old_parking_data()

Крок 4: Планування скриптів за допомогою cron jobs

Використовуйте cron, щоб запускати ці скрипти кожні п'ять хвилин, кожну годину та щодня.

crontab -e

pic

Використання cron jobs для запуску скриптів

Крок 5: Налаштування бази даних Postgres для дозволу підключень від сервера Metabase

Додайте IP-адресу 11.22.33.44/32 сервера Metabase до файлу pg_hba.conf на сервері бази даних для дозволу доступу до даних. Перезапустіть службу posgres, щоб ці зміни набрали чинності.

pic

Додайте IP-адресу сервера Metabase до файлу `pghba.conf`_

Крок 6: Встановлення та налаштування Metabase

  1. Встановіть Metabase:
sudo apt install openjdk-11-jre   
wget 

2. Налаштування HTTPS:

Отримайте SSL сертифікат (наприклад, за допомогою Let’s Encrypt). У нашому випадку ми працюємо з веб-сервером Jetty для Java, а не з ngnix чи apache, тому ми використовуємо параметр — standalone для створення сертифіката.

sudo apt update  
sudo apt install certbot  

sudo certbot certonly --standalone -d yourdomain.com -d www.yourdomain.com   
sudo certbot --nginx -d yourdomain.com -d www.yourdomain.com

3. Перетворення сертифіката в Java KeyStore (JKS):

openssl pkcs12 -export \\  
 -in /etc/letsencrypt/live/your  
 -inkey /etc/letsencrypt/live/yourdomain/  
 -out metabase.p12 \\  
 -name metabase   

keytool -importkeystore \\  
 -deststorepass your_keystore_password \\  
 -destkeypass your_keystore_password \\  
 -  
 -srckeystore metabase.p12 \\  
 -srcstoretype PKCS12 \\  
 -srcstorepass your_pkcs12_password \\  
 -alias metabase

4. Запуск Metabase як служби:

Створіть файл служби systemd у цьому місці за допомогою текстового редактора nano:

sudo nano /etc/systemd/system/metabase.service
[Unit]  
Description=Metabase  
After=syslog.target  

[Service]  
ExecStart=/usr/bin/java -jar /path/to/metabase.jar  
EnvironmentFile=/etc/default/metabase  
User=metabase  
WorkingDirectory=/path/to/metabase  
Restart=on-failure  

#environment  
Environment="MB_JETTY_SSL=true"  
Environment="MB_JETTY_SSL_PORT=8443"  
Environment="MB_JETTY_SSL_KEYSTORE=/opt/metabase/keystore.jks"  
Environment="MB_JETTY_SSL_KEYSTORE_PASSWORD=your_keystore_password"  

[Install]  
WantedBy=multi-user.target

Після змін у файлі служби необхідно перезавантажити конфігурацію менеджера systemd, щоб застосувати зміни. Потім ви можете запустити і активувати службу:

sudo systemctl daemon-reload  
sudo systemctl start metabase  
sudo systemctl enable metabase

Крок 7: Створення панелей управління в Metabase

1.
Підключення Metabase до PostgreSQL:

pic

Підключіть Metabase до вашої бази даних PostgreSQL

2. Встановіть URL на HTTPS:

pic

Налаштування сайту Metabase

3. Створіть об'єкти та візуалізації, які ви хочете використовувати в вашій панелі управління.

Важливо зазначити, що я підготував кілька подань у базі даних PostgreSQL для виконання певних обчислень та інтеграцій, щоб уникнути необхідності робити це тут. Панелі управління Metabase пов'язані з поданнями, а не з таблицями.

pic

Створіть об'єкти та візуалізації у Metabase

Наприклад, тренд по годинах:

pic

Налаштуйте візуалізацію тренду по годинах за останні 5 днів

Metabase дозволяє вам вивести дані на карту, якщо у вас є координати в даних, і ми їх маємо:

pic

Виведіть ваші дані на карту.

Висновок

Звісно, я пропустив деякі деталі тут і там, але це має дати вам чітке розуміння того, що потрібно для створення власної панелі управління Metabase, підключеної до відкритого API даних. Тепер у мене є повністю автоматизована система, яка отримує, зберігає, агрегує та візуалізує дані про паркінг з Stad Gent. Тож, коли мої друзі запитують, де припаркувати свою машину перед тим, як ми поїдемо, я просто відправляю їм це посилання на свою панель управління, і ми готові до поїздки! Сподіваюся, це надихне вас на дослідження ваших власних даних — удачі в побудові!

Перекладено з: Tutorial: Building a Parking Data Pipeline with Stad Gent Open Data API, PostgreSQL, and Metabase

Leave a Reply

Your email address will not be published. Required fields are marked *