Ingestion
Tietolähteitä on monenlaisia. Huomaa, että kaikkia näitä voidaan tuoda tietoalustaan ETL tai ELT -työnkulkua noudattaen. Kirjaimet edustavat sanoja Extract, Transform, Load. Tiivistetysti ETL:n ja ELT:n ero on siinä, että ETL:ssä dataa muokataan ennen tietovarastoon vientiä, kun taas ELT:ssä dataa muokataan vasta tietovarastoon viennin jälkeen.
Nykyisin ELT on yleisempi lähestymistapa. Syitä tälle on monia, kuten:
- ETL:n T on vaikea määritellä pysyvästi. Jos T:n määritelmä muuttuu, data joudutaan hakemaan alusta alkaen uusiksi tietovarastoon.
- Raakadata itsessään voi olla hyödyllistä koneoppimisen näkökulmasta.
Ajoittain törmää myös termiin EtLT, jolla korostetaan sitä, että Extractin ja Loadin välillä pienimuotoista datan käsittelyä. Dataa ei siis välttämättä muokata lopulliseen tietovaraston tarvitsemaa muotoon, mutta sitä saatetaan käsitellä esimerkiksi anonymisoinnin tai pseudonymisoinnin vuoksi, surrogaattiavainten laskemiseksi, tai muiden syiden vuoksi.
SaaS vs. DIY
Tietoalustan rakentamiseen on useita eri lähestymistapoja. Yksi on rakentaa kaikki itse. Toinen on ostaa kaikki valmiina palveluna. Kolmas on yhdistää näitä kahta. Mikäli käytät ELT-lähestymistapaa, ingestion tool lataa tiedon joko Staging tai Bronze tasolle, riippuen miten olet halunnut määritellä sen. Tietoalustasi vastaa T-kirjaimen toteutuksesta eli eri tietolähteiden tiedojen mallintamisesta ja yhdistämisestä. Huomaa, että tämä on täysin modularisoitavissa. Sinulla voi olla rinnakkain useita eri ELT-työtyökaluja, joista kukin vastaa jostakin/joistakin tietolähteistä.
Tehtävä
Tutustu seuraaviin SaaS-palveluihin. Mitä tietolähteitä ne tukevat?
Yksittäisiin työkaluihin tutustuminen on kuitenkin tämän kurssin skoopin ulkopuolella. Mikäli sinulla on oikea yrityksen case, kannattaa vertailuttaa useita eri tarjoajia. Valittuun tarjoajaan voi vaikuttaa myös yrityksesi jo valmiiksi käyttämä hyperscaler (AWS, Azure, Google.)
Tietolähteet
Alla on käsiteltynä tyypillisiä tietolähteitä alkaen tietokannoita ja päättyen web-sivuihin, joista tieto ladataan web-scrapingin avulla.
Tietokannat
SQL ja noSQL
Eri SQL-kantojen (MySQL, MariaDB, Postgresql) ja noSQL-kantojen (MongoDB, Cassandra) dataa voi ladata eri strategioita hyödyntäen. SQL-kannoista palautuu lista monikkoja (list[tuple]
), kun taas noSQL-kannoista palautuu JSON string tai lista sanakirjoja (list[dict]
). Koska ingestion huolehtii vain latauksesta, ja T eli Transform tehdään vasta myöhemmissä vaiheissa, SQL ja noSQL saavat hyvin samankaltaisen kohtelun tässä vaiheessa: dumppaa data stagingiin ja murehdi myöhemmin.
- non-CDC: Full Load joka yö (naiivi)
- CDC: Inkrementaalista kenttää hyötyntäen
- CDC: Muutoshistorian lukeminen
Tehtävä
Lataukseen hyödynnetään tietokannan ajuria (JDBC/OBDC), joka mahdollistaa tietokannan lukemisen Pythonilla. Tyypillisesti nämä voi asentaa pip
:n avulla. Tutustu seuraaviin paketteihin pintapuoleisesti: pymysql
, psycopg2
.
Tip
Termi CDC tarkoittaa Change Data Capture. Sillä tarkoitetaan lyhesti ottaen sitä, että lataustyökalu tai -skripti pyrkii hakemaan sen, mikä on uutta sitten edellisen haun. Tämä onnistuu esimerkiksi timestamp-kentän avulla, jolloin lataustyökalu tietää, että seuraavalla kerralla pitää hakea kaikki rivit, joiden timestamp on suurempi kuin edellisellä kerralla.
Timestamp CDC
Aikakoodikenttään tai monotonisesti kasvavaan ID-kenttään perustuva inkrementaalinen lataus on äärimmäisen yksinkertainen. Ensimmäisellä latauskerralla haetaan aivan kaikki rivit. Jatkossa haetaan vain rivit, joissa inkrementaalinen kenttä on suurempi kuin edellisellä kerralla.
Suuret SELECT * FROM table
-tyyliset kyselyt ovat tietokantapalvelimelle muistin- tai kiintolevynkäytön kannalta erittäin raskaita. Ethän aja vastaavia kyselyitä tuotantokantaa vasten. Kriittiset kannat on suositeltavaa replikoida erilliseen tietokantaan, josta lataus voidaan tehdä. Jos ELT-skriptin kuorma kaataa palvelimen tai täyttää sen levytilan, tuotanto ei kärsi. Tämä on kallista, mutta turvallisempaa kuin tuotantokannan kuormittaminen.
Äärimmäisen naiivi latausskriptin alku voisi olla seuraavanlainen:
# Import pymysql library
import pymysql
from imaginery_library import write_as_parquet
# Connect to the database
connection = pymysql.connect(host='localhost',
user='user',
password='password',
db='database'
)
if FULL_LOAD:
query = "SELECT * FROM table"
elif CDC:
query = f"SELECT * FROM table WHERE timestamp > {last_timestamp}"
# Execute the query and dump to staging
write_as_csv(connection.execute(query).fetchall())
Warning
Timestamppiin perustuva inkrementaalinen lataus ei toteuta tietokannan poistoja laisinkaan. Tämä on potentiaalinen GDPR-rike. Tämän takia on suositeltavaa lukea muutoshistoriaa, jolloin poistotkin saadaan mukaan.
Binary Log CDC
Binary log on tietokantapalvelimen alunperin sisäiseen käyttöön tarkoitettu lokitiedosto, joka sisältää kaikki tietokannan muutokset. Binary logia voi lukea myös ulkopuolinen sovellus, jolloin se toimii CDC:n tavoin. Tietokannat käyttävät lokitiedostoa muun muassa homogeeniseen replikointii (esim. MySQL => MySQL) sekä vikaantumistilanteiden varalle. Binary log pitää erikseen olla aktivoituna tietokannassa; se ei välttämättä ole oletuksena päällä. Lisäksi binlogia ei säilytetä ikuisesti, joten sinulla on n päivää replikoida muutokset ennen kuin ne katoavat. Itse muutokset sisältävät operaation tyypin (write, update, delete) sekä rivin vanhan ja uuden tilan.
Mikäli CDC:n haluaa toteuttaa muutoshistoriaan nojaten, voi olla järkevintä unohtaa kotikutoinen Python-skripti, ja ottaa käyttöön jokin kaupallinen palvelu tai avoimen lähdekoodin työkalu (esim. Apache Kafka + Debezium tai Airbyte.)
Jos kuitenkin haluat tutustua konseptiin Pythonin avulla, tämä onnistuu esimerkiksi python-mysql-replication
-kirjastolla.
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
# Start from log_file position log_pos.
# Examples: log_file='mysql-bin.000003', log_pos=4
# Keep only events that affect rows. Ignore e.g. DDL events.
stream = BinLogStreamReader(
connection_settings=mysql_settings,
log_file=log_file,
log_pos=log_pos,
resume_stream=True,
only_events=[
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent
]
)
# Process the binary log events
for binlog_event in stream:
# Process
pass
# Close connection
stream.close()
Tiedostot
TODO: Tiedon jäsentyneisyyden tasot: Structured, Semi-structured, Unstructured.
API
TODO: REST, GraphQL
IoT
TODO: MQTT
Streaming
TODO: Pub/sub
Web pages
TODO: Scraping