Слідуйте за мною на LinkedIn, якщо вам сподобалося те, що ви тут прочитали.
У цьому посібнику я розповім, як я використовував API відкритих даних Stad Gent для отримання даних про паркінги, збереження їх у PostgreSQL базі даних та візуалізації за допомогою Metabase.
Ось результат: parkinggent.xudo.be.
"Система спроектована для роботи на серверах Hetzner Cloud з Ubuntu і включає автоматизоване збирання та очищення даних для ефективного управління зберіганням у базі даних. Я хотів вивчити інструменти, з якими я не часто зустрічаюся зі своїми поточними клієнтами, які здебільшого використовують QlikView і Qlik Sense. Ось чому я створив цей маленький хобі-проєкт — щоб підтримувати форму і йти в ногу з швидко розвиваючим світом інструментів для роботи з даними.
Мене звати Вутер Трапперс, і я фрилансер з бізнес-аналітики в компанії Xudo. Створення каналів для обробки даних і панелей управління — моя основна робота, але навіть так я зіштовхнувся з труднощами на деяких етапах цього процесу. Пам'ятайте, що ці кроки є результатом проб і помилок, і не все проходить так гладко, як може здатися в посібнику. Якщо щось не працює одразу, не зневіряйтесь — продовжуйте експериментувати, поки не вийде!"
Огляд системи
- Збір даних: Скрипти на Python отримують дані про паркінги з Stad Gent API кожні 5 хвилин і зберігають їх у базі даних PostgreSQL.
- Зберігання даних: Створюються таблиці PostgreSQL для збереження сирих і агрегованих даних.
- Агрегування даних: Скрипт агрегує дані щогодини та видаляє детальні дані старші за 2 тижні.
- Візуалізація: Metabase встановлений на окремому сервері, налаштований для роботи через HTTPS, і використовується для створення панелей управління для реальних і історичних даних про паркінг.
Крок 1: Налаштування серверів Hetzner Cloud
Створіть два сервери в консолі Hetzner Cloud.
- Сервер 1: Для PostgreSQL та Python-скриптів.
- Сервер 2: Для Metabase.
- Використовуйте Ubuntu 22.04 LTS для обох серверів.
Я використовую найменші сервери типу CX22. Вони мають 2 процесори, 4 ГБ ОЗП і 40 ГБ SSD за 3,29€ на місяць. Я також регулярно створюю знімки своїх серверів у Hetzner, щоб мати актуальну версію для відновлення, якщо щось піде не так — а це сталося під час спроби встановити HTTPS на сервері Metabase.
Консоль Hetzner
Чому два сервери, а не один?
Використання двох серверів замість одного — це дизайнерське рішення, яке приносить кілька переваг, зокрема в плані масштабованості, безпеки та продуктивності. Для стислості, залишу це так, але якщо ви хочете детальніше дізнатися про причини використання двох серверів — замість одного, що технічно можливо — ви можете прочитати більше на сайті Xudo.
Крок 2: Встановлення та налаштування PostgreSQL
- Встановіть PostgreSQL на сервер Ubuntu.
Підключіться до сервера через SSH за допомогою вашого користувача та пароля і виконайте наступні команди:
sudo apt update
sudo apt install postgresql
2. Створіть базу даних і користувача:
sudo -u postgres psql
CREATE DATABASE parking_data;
CREATE USER parking_user WITH PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE parking_data TO parking_user;
**3.
Встановлення pgAdmin4
Ви можете створювати таблиці через командний рядок, але ми встановили pgAdmin4 для зручнішого управління нашими базами даних:
sudo curl -fsSL https://www.pgadmin.org/static/packages_pgadmin_org.pub | sudo gpg --dearmor -o /usr/share/keyrings/pgadmin-keyring.gpg
sudo sh -c 'echo "deb [signed-by=/usr/share/keyrings/pgadmin-keyring.gpg] https://ftp.postgresql.org/pub/pgadmin/pgadmin4/apt/$(lsb_release -cs) pgadmin4 main" > /etc/apt/sources.list.d/pgadmin4.list'
sudo apt update
sudo apt install pgadmin4-web
sudo apt install apache2
sudo a2enconf pgadmin4
4. Створення таблиць
Створіть наступні таблиці за допомогою pgAdmin4.
Таблиця сирих даних: parking_data
CREATE TABLE IF NOT EXISTS parking_data.parking_data (
id text COLLATE pg_catalog."default",
name text COLLATE pg_catalog."default" NOT NULL,
last_update timestamp with time zone NOT NULL,
total_capacity integer NOT NULL,
available_capacity integer NOT NULL,
occupation integer NOT NULL,
type text COLLATE pg_catalog."default" NOT NULL,
description text COLLATE pg_catalog."default",
external_id text COLLATE pg_catalog."default",
opening_times_description text COLLATE pg_catalog."default",
is_open_now integer NOT NULL,
temporarily_closed integer NOT NULL,
operator_information text COLLATE pg_catalog."default",
free_parking integer NOT NULL,
url_link_address text COLLATE pg_catalog."default",
occupancy_trend text COLLATE pg_catalog."default",
specific_access_information text COLLATE pg_catalog."default",
level text COLLATE pg_catalog."default",
road_number text COLLATE pg_catalog."default",
road_name text COLLATE pg_catalog."default",
latitude double precision NOT NULL,
longitude double precision NOT NULL,
text text COLLATE pg_catalog."default",
category text COLLATE pg_catalog."default",
dashboard text COLLATE pg_catalog."default" NOT NULL,
last_modified timestamp with time zone,
CONSTRAINT parkinggent_external_id_key UNIQUE (external_id) )
Таблиця агрегованих даних: parking_hourly
CREATE TABLE IF NOT EXISTS parking_data.parking_hourly (
name text COLLATE pg_catalog."default" NOT NULL,
is_open_now integer,
temporarily_closed integer,
last_modified timestamp without time zone,
average_total_capacity double precision,
average_available_capacity double precision,
capacity_trend integer,
last_update timestamp with time zone )
Крок 3: Написання Python-скриптів
1. Отримання даних з Stad Gent API
Встановіть необхідні бібліотеки Python на сервері бази даних
pip install requests psycopg2-binary
2. Python скрипт для отримання та збереження даних:
Я додав розширене логування до скрипту, щоб виявити деякі поля, які не були коректно отримані. Я вирішив помістити поля, які мені не вдалося налаштувати, в коментарі, оскільки вони мені не потрібні для контексту панелі, яку я хотів побудувати.
Дані в API оновлюються кожні 5 хвилин.
Щоб отримувати останні оновлення, я використовую cron-job для запуску цього скрипта кожні 5 хвилин.
import requests
import psycopg2
import logging
from psycopg2 import sql
# Налаштування логування
logging.basicConfig(
filename="parking_data_insertion.log",
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# URL API
API_URL = ""# Параметри підключення до бази даних
DB_CONFIG = {
"dbname": "parking_data",
"user": "parking_user",
"password": "your_password",
"host": "127.0.0.1",
"port": 5432
}
def fetch_and_insert_data():
try:
logging.info("Fetching data from API.")
response = requests.get(API_URL)
response.raise_for_status()
data = response.json()
# Логування сирого відповіді для налагодження
# logging.debug(f"Raw API response: {data}")
# Перевірка наявності 'results' та логування його вмісту
results = data.get("results", [])
if not results:
logging.warning("No results found in the API response.")
else:
logging.info(f"Fetched {len(results)} records from API.")
# Вставка записів у базу даних
insert_data_to_db(results)
except requests.RequestException as e:
logging.error(f"Error fetching data from API: {e}")
except Exception as e:
logging.error(f"Unexpected error while fetching data: {e}")
def insert_data_to_db(records):
logging.info("Starting the database insertion process.")
connection = None
try:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
logging.info("Successfully connected to the database.")
insert_query = """
INSERT INTO parking_data.parking_data(
id, name, last_update, total_capacity, available_capacity, occupation, type,
description, opening_times_description, is_open_now, temporarily_closed, url_link_address,
operator_information, free_parking, occupancy_trend,latitude, longitude,
category, dashboard
) VALUES (
%(id)s, %(name)s, %(lastupdate)s, %(totalcapacity)s, %(availablecapacity)s, %(occupation)s, %(type)s,
%(description)s, %(openingtimesdescription)s, %(isopennow)s, %(temporaryclosed)s,%(urllinkaddress)s,
%(operatorinformation)s, %(freeparking)s, %(occupancytrend)s,%(latitude)s, %(longitude)s,
%(categorie)s, %(dashboard)s
)
"""
logging.debug("SQL INSERT query prepared.")
for i, record in enumerate(records, start=1):
# Витягування даних безпосередньо з словника record
name = record.get("name")
lastupdate = record.get("lastupdate")
totalcapacity = record.get("totalcapacity")
availablecapacity = record.get("availablecapacity")
occupation = record.get("occupation")
type = record.get("type")
description = record.get("description")
id = record.get("id")
openingtimesdescription = record.get("openingtimesdescription")
isopennow = record.get("isopennow")
temporaryclosed = record.get("temporaryclosed")
operatorinformation = record.get("operatorinformation")
freeparking = record.get("freeparking")
urllinkaddress = record.get("urllinkaddress")
occupancytrend = record.get("occupancytrend")
# Витягування вкладених даних (якщо є)
# specific_access_info = record.get("locationanddimension", {}).get("specificAccessInformation", [""])[0]
# level = record.get("level")
# roadnumber = record.get("roadnumber")
# roadname = record.get("roadname")
# Витягування вкладених даних (якщо є)
latitude = record.get("location", {}).get("lat")
longitude = record.get("location", {}).get("lon")
# text = record.get("text")
categorie = record.get("categorie")
dashboard = record.get("dashboard")
# Перевірка на відсутність важливих даних
if not name or not totalcapacity or not latitude or not longitude:
logging.warning(f"Record {i} skipped: Missing essential data.")
continue
data = {
"name": name,
"lastupdate": lastupdate,
"totalcapacity": totalcapacity,
"availablecapacity": availablecapacity,
"occupation": occupation,
"type" : type,
"description" : description,
"id" : id,
"openingtimesdescription": openingtimesdescription,
"isopennow" : isopennow,
"temporaryclosed": temporaryclosed,
"operatorinformation" : operatorinformation,
"freeparking" : freeparking,
"urllinkaddress" : urllinkaddress,
"occupancytrend" : occupancytrend,
# "specificAccessInformation": specific_access_info,
# "level" : level,
# "roadnumber" :roadnumber,
# "roadname" : roadname,
"latitude" : latitude,
"longitude" : longitude,
# "text" : text,
"categorie" : categorie,
"dashboard" : dashboard
}
# Перевірка на відсутні ключі перед вставкою
missing_keys = [key for key in data if data[key] is None]
if missing_keys:
logging.warning(f"Запис {i} пропущено через відсутні ключі: {', '.join(missing_keys)}")
continue
try:
cursor.execute(insert_query, data)
logging.info(f"Запис {i} успішно вставлений.")
except Exception as e:
logging.error(f"Помилка при вставці запису {i}: {e}")
logging.error(f"Дані для запису {i}: {data}") # Логування даних для перевірки
continue
connection.commit()
logging.info("Всі записи успішно збережено в базі даних.")
except psycopg2.Error as e:
logging.error(f"Помилка бази даних: {e}")
except Exception as e:
logging.error(f"Неочікувана помилка: {e}")
finally:
if connection:
cursor.close()
connection.close()
logging.info("З'єднання з базою даних закрито.")
# Виклик функції для отримання даних і вставки в базу даних
fetch_and_insert_data()
Можливості для вдосконалення
Одним із удосконалень може бути з'ясування, чому деякі поля не завантажуються правильно.
Ще одна річ, яка стосується того, як я планую запуск цього завдання. Пишучи цей посібник, я зрозумів, що також міг би використати бібліотеку Python Schedule і запускати скрипт як службу. Це безумовно те, що я розгляну наступного разу, коли буду створювати подібний хобі-проєкт.
3. Скрипт для агрегації
Щоб мати можливість аналізувати тренди в минулому, не зберігаючи всі дані, що створюються кожні п'ять хвилин, я агрегував дані по годинах. Я також додав тренд: чи приїжджають люди, чи від'їжджають?
import psycopg2
import logging
def insert_parking_data():
"""
Встановлює з'єднання з базою даних і вставляє обчислені дані про паркінг.
"""
# Налаштування логування
logging.basicConfig(filename='parking_data_hourly_aggregation.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Параметри підключення до бази даних
DB_CONFIG = {
"dbname": "parking_data",
"user": "parking_user",
"password": "your_password",
"host": "127.0.0.1",
"port": 5432
}
sql = """
WITH sq_first AS (
SELECT DISTINCT ON (name)
name,
FIRST_VALUE(available_capacity) OVER (PARTITION BY name ORDER BY ABS(EXTRACT(EPOCH FROM last_modified - (NOW() - INTERVAL '1 hour'))) ASC) AS first_available_capacity
FROM
parking_data.parking_data
WHERE
last_modified <= NOW() - INTERVAL '1 hour'
),
sq_last AS (
SELECT DISTINCT ON (name)
name,
FIRST_VALUE(available_capacity) OVER (PARTITION BY name ORDER BY last_modified DESC) AS last_available_capacity
FROM
parking_data.parking_data
WHERE
last_modified >= NOW() - INTERVAL '1 hour'
)
INSERT INTO parking_data.parking_hourly_aggregation (name, is_open_now, temporarily_closed, last_update, last_modified, average_total_capacity, average_available_capacity, capacity_trend)
SELECT
parking_data.name,
parking_data.is_open_now,
parking_data.temporarily_closed,
MAX(parking_data.last_update) AT TIME ZONE 'Europe/Brussels' AS last_update,
MAX(parking_data.last_modified) AT TIME ZONE 'Europe/Brussels' AS last_modified,
ROUND(AVG(parking_data.total_capacity), 0) AS average_total_capacity,
ROUND(AVG(parking_data.available_capacity), 0) AS average_available_capacity,
sq_last.last_available_capacity - sq_first.first_available_capacity AS capacity_trend
FROM
parking_data.parking_data
LEFT JOIN sq_first ON parking_data.name = sq_first.name
LEFT JOIN sq_last ON parking_data.name = sq_last.name
WHERE
parking_data.last_modified >= NOW() - INTERVAL '1 hour' -- Фільтрування даних за останню годину
GROUP BY
parking_data.name,
parking_data.is_open_now,
parking_data.temporarily_closed,
sq_first.first_available_capacity,
sq_last.last_available_capacity;
"""
connection = None # Оголошуємо з'єднання поза блоком try-except
try:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
logging.info("Успішно підключено до бази даних.")
# Виконання SQL-запиту INSERT
cursor.execute(sql)
connection.commit() # Підтвердження транзакції
logging.info("Дані успішно вставлено.")
except (Exception, psycopg2.Error) as error:
logging.error("Помилка при вставці даних", exc_info=error)
finally:
if connection:
cursor.close()
connection.close()
logging.info("З'єднання з базою даних закрито.")
if __name__ == "__main__":
insert_parking_data()
- Скрипт для видалення
Щоб переконатися, що сервер не заповнюється занадто швидко, я зберігаю повні дані лише за останні 48 годин.
import psycopg2
import logging
def delete_old_parking_data():
"""
Встановлює з'єднання з базою даних і видаляє записи старші за 48 годин.
"""
# Налаштування логування
logging.basicConfig(filename='parking_data_delete_old_data.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Параметри підключення до бази даних
DB_CONFIG = {
"dbname": "ParkingGent",
"user": "postgres",
"password": "rypkGWyh1r",
"host": "127.0.0.1",
"port": 5432
}
delete_sql = """
DELETE FROM parkinggent.parkinggent
WHERE last_modified < NOW() - INTERVAL '48 hours';
"""
connection = None # Оголошуємо з'єднання поза блоком try-except
try:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
logging.info("Успішно підключено до бази даних.")
# Виконання SQL-запиту DELETE
cursor.execute(delete_sql)
connection.commit() # Підтвердження транзакції
logging.info("Старі дані успішно видалено.")
except (Exception, psycopg2.Error) as error:
logging.error("Помилка при видаленні даних", exc_info=error)
finally:
if connection:
cursor.close()
connection.close()
logging.info("З'єднання з базою даних закрито.")
if __name__ == "__main__":
delete_old_parking_data()
Крок 4: Планування скриптів за допомогою cron jobs
Використовуйте cron
, щоб запускати ці скрипти кожні п'ять хвилин, кожну годину та щодня.
crontab -e
Використання cron jobs для запуску скриптів
Крок 5: Налаштування бази даних Postgres для дозволу підключень від сервера Metabase
Додайте IP-адресу 11.22.33.44/32 сервера Metabase до файлу pg_hba.conf
на сервері бази даних для дозволу доступу до даних. Перезапустіть службу posgres, щоб ці зміни набрали чинності.
Додайте IP-адресу сервера Metabase до файлу `pghba.conf`_
Крок 6: Встановлення та налаштування Metabase
- Встановіть Metabase:
sudo apt install openjdk-11-jre
wget
2. Налаштування HTTPS:
Отримайте SSL сертифікат (наприклад, за допомогою Let’s Encrypt). У нашому випадку ми працюємо з веб-сервером Jetty для Java, а не з ngnix чи apache, тому ми використовуємо параметр — standalone для створення сертифіката.
sudo apt update
sudo apt install certbot
sudo certbot certonly --standalone -d yourdomain.com -d www.yourdomain.com
sudo certbot --nginx -d yourdomain.com -d www.yourdomain.com
3. Перетворення сертифіката в Java KeyStore (JKS):
openssl pkcs12 -export \\
-in /etc/letsencrypt/live/your
-inkey /etc/letsencrypt/live/yourdomain/
-out metabase.p12 \\
-name metabase
keytool -importkeystore \\
-deststorepass your_keystore_password \\
-destkeypass your_keystore_password \\
-
-srckeystore metabase.p12 \\
-srcstoretype PKCS12 \\
-srcstorepass your_pkcs12_password \\
-alias metabase
4. Запуск Metabase як служби:
Створіть файл служби systemd у цьому місці за допомогою текстового редактора nano:
sudo nano /etc/systemd/system/metabase.service
[Unit]
Description=Metabase
After=syslog.target
[Service]
ExecStart=/usr/bin/java -jar /path/to/metabase.jar
EnvironmentFile=/etc/default/metabase
User=metabase
WorkingDirectory=/path/to/metabase
Restart=on-failure
#environment
Environment="MB_JETTY_SSL=true"
Environment="MB_JETTY_SSL_PORT=8443"
Environment="MB_JETTY_SSL_KEYSTORE=/opt/metabase/keystore.jks"
Environment="MB_JETTY_SSL_KEYSTORE_PASSWORD=your_keystore_password"
[Install]
WantedBy=multi-user.target
Після змін у файлі служби необхідно перезавантажити конфігурацію менеджера systemd, щоб застосувати зміни. Потім ви можете запустити і активувати службу:
sudo systemctl daemon-reload
sudo systemctl start metabase
sudo systemctl enable metabase
Крок 7: Створення панелей управління в Metabase
1.
Підключення Metabase до PostgreSQL:
Підключіть Metabase до вашої бази даних PostgreSQL
2. Встановіть URL на HTTPS:
Налаштування сайту Metabase
3. Створіть об'єкти та візуалізації, які ви хочете використовувати в вашій панелі управління.
Важливо зазначити, що я підготував кілька подань у базі даних PostgreSQL для виконання певних обчислень та інтеграцій, щоб уникнути необхідності робити це тут. Панелі управління Metabase пов'язані з поданнями, а не з таблицями.
Створіть об'єкти та візуалізації у Metabase
Наприклад, тренд по годинах:
Налаштуйте візуалізацію тренду по годинах за останні 5 днів
Metabase дозволяє вам вивести дані на карту, якщо у вас є координати в даних, і ми їх маємо:
Виведіть ваші дані на карту.
Висновок
Звісно, я пропустив деякі деталі тут і там, але це має дати вам чітке розуміння того, що потрібно для створення власної панелі управління Metabase, підключеної до відкритого API даних. Тепер у мене є повністю автоматизована система, яка отримує, зберігає, агрегує та візуалізує дані про паркінг з Stad Gent. Тож, коли мої друзі запитують, де припаркувати свою машину перед тим, як ми поїдемо, я просто відправляю їм це посилання на свою панель управління, і ми готові до поїздки! Сподіваюся, це надихне вас на дослідження ваших власних даних — удачі в побудові!
Перекладено з: Tutorial: Building a Parking Data Pipeline with Stad Gent Open Data API, PostgreSQL, and Metabase