Python API
If none of the above interfaces suit your needs, or you want to write custom scripts, you can use the AIMBAT Python API. This is the most powerful way to interact with your projects. View the full API reference here.
Core Concepts
The API is built on three main components:
- Models: SQLModel classes that represent
the database schema (
aimbat.models) as Python objects. - Core Functions: High-level operations that manipulate those models
(
aimbat.core). - Database Session: A SQLAlchemy session used to track changes and interact with the project database.
Project Location
By default AIMBAT reads and writes aimbat.db in the current directory. Set
AIMBAT_PROJECT to use a different path:
The aimbat.db.engine singleton picks this up automatically, so scripts that
import it will use the same database as the CLI.
Session Management
Every database operation requires a Session. Use it as a context manager so
it is always closed cleanly:
from sqlmodel import Session
from aimbat.db import engine
with Session(engine) as session:
# query or modify data here
pass
Changes accumulate in the session and are written to disk only when
session.commit() is called (or when you call a core function that commits
internally). If an exception is raised before committing, the session is rolled
back automatically.
Creating a Project
This is a one-time operation that creates the schema and the SQLite triggers
that enforce the single-default-event constraint and track modification times.
It raises RuntimeError if the schema already exists.
Adding Data
The central function is add_data_to_project:
from sqlmodel import Session
from aimbat.db import engine
from aimbat.core import add_data_to_project
from aimbat.io import DataType
with Session(engine) as session:
add_data_to_project(session, paths, DataType.SAC)
The DataType enum controls what is read from each source:
DataType |
What is created |
|---|---|
SAC |
Event + Station + Seismogram |
JSON_EVENT |
Event only (no seismogram) |
JSON_STATION |
Station only (no seismogram) |
JSON formats
Event (DataType.JSON_EVENT):
Station (DataType.JSON_STATION):
{
"name": "ANMO",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 34.946,
"longitude": -106.457,
"elevation": 1820.0
}
Providing event or station metadata externally
SAC files from some sources omit event or station headers. In that case, add
the metadata separately first and then link the SAC files to the resulting
database records using event_id and station_id:
with Session(engine) as session:
add_data_to_project(session, [event_json], DataType.JSON_EVENT)
add_data_to_project(session, [station_json], DataType.JSON_STATION)
event = session.exec(select(AimbatEvent)).one()
station = session.exec(select(AimbatStation)).one()
add_data_to_project(
session,
sac_files,
DataType.SAC,
event_id=event.id,
station_id=station.id,
)
Worked Example
The script below builds a complete project from scratch. It loads 3 events, 10 stations, and 20 seismograms where the SAC files carry waveform data but no event or station headers — all metadata is provided via JSON.
"""
Load a project from SAC files that carry no event/station headers.
Layout:
- 3 events
- 10 broadband stations
- 20 seismograms: events 1 and 2 recorded at 7 stations each,
event 3 recorded at 6 stations
"""
import json
from pathlib import Path
from typing import Any
from sqlmodel import Session, select
from aimbat.db import engine
from aimbat.core import (
add_data_to_project,
create_project,
create_snapshot,
set_default_event,
)
from aimbat.io import DataType
from aimbat.models import AimbatEvent, AimbatStation
# ------------------------------------------------------------------ #
# Metadata #
# ------------------------------------------------------------------ #
EVENTS: list[dict[str, Any]] = [
{
"time": "2024-01-12T08:14:33Z",
"latitude": 37.52,
"longitude": 143.04,
"depth": 35.0,
},
{
"time": "2024-02-28T21:07:55Z",
"latitude": -23.11,
"longitude": -67.89,
"depth": 120.0,
},
{
"time": "2024-03-09T03:51:20Z",
"latitude": 51.72,
"longitude": 178.35,
"depth": 55.0,
},
]
STATIONS: list[dict[str, Any]] = [
{
"name": "ANMO",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 34.946,
"longitude": -106.457,
"elevation": 1820.0,
},
{
"name": "COLA",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 64.874,
"longitude": -147.861,
"elevation": 84.0,
},
{
"name": "GUMO",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 13.589,
"longitude": 144.868,
"elevation": 74.0,
},
{
"name": "HRV",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 42.506,
"longitude": -71.558,
"elevation": 200.0,
},
{
"name": "MAJO",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 36.536,
"longitude": 138.204,
"elevation": 399.0,
},
{
"name": "MIDW",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 28.216,
"longitude": -177.370,
"elevation": 150.0,
},
{
"name": "POHA",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 19.757,
"longitude": -155.531,
"elevation": 1936.0,
},
{
"name": "SSPA",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 40.636,
"longitude": -77.888,
"elevation": 270.0,
},
{
"name": "TATO",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 24.975,
"longitude": 121.498,
"elevation": 75.0,
},
{
"name": "YSS",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 46.958,
"longitude": 142.760,
"elevation": 89.0,
},
]
# Which stations recorded each event (indices into STATIONS list).
# 7 + 7 + 6 = 20 seismograms total.
EVENT_STATION_MAP = {
0: [0, 1, 2, 3, 4, 5, 6], # event 1 — 7 seismograms
1: [0, 1, 2, 3, 4, 5, 6], # event 2 — 7 seismograms
2: [0, 1, 2, 3, 4, 5], # event 3 — 6 seismograms
}
# ------------------------------------------------------------------ #
# Helpers #
# ------------------------------------------------------------------ #
def write_json(data: dict, path: Path) -> Path:
path.write_text(json.dumps(data))
return path
def sac_path(event_idx: int, station_idx: int) -> Path:
"""Return the path to the SAC file for a given event/station pair."""
return Path(f"data/ev{event_idx + 1:02d}_st{station_idx + 1:02d}.sac")
# ------------------------------------------------------------------ #
# Main #
# ------------------------------------------------------------------ #
workdir = Path("json_metadata")
workdir.mkdir(exist_ok=True)
# 1. Initialise project
create_project(engine)
with Session(engine) as session:
# 2. Register events from JSON
event_files = [
write_json(ev, workdir / f"event_{i:02d}.json") for i, ev in enumerate(EVENTS)
]
add_data_to_project(session, event_files, DataType.JSON_EVENT)
# 3. Register stations from JSON
station_files = [
write_json(st, workdir / f"station_{i:02d}.json")
for i, st in enumerate(STATIONS)
]
add_data_to_project(session, station_files, DataType.JSON_STATION)
# 4. Retrieve the newly created records
events = session.exec(select(AimbatEvent)).all()
stations = session.exec(select(AimbatStation)).all()
# Build lookup maps by (time, network+name) so insertion order doesn't matter
event_by_time = {str(e.time)[:19]: e for e in events}
station_by_key = {(s.network, s.name): s for s in stations}
# 5. Add SAC files, linking each to its pre-registered event and station
for ev_idx, st_indices in EVENT_STATION_MAP.items():
ev_time = EVENTS[ev_idx]["time"][:19]
db_event = event_by_time[ev_time]
for st_idx in st_indices:
st_meta = STATIONS[st_idx]
db_station = station_by_key[(st_meta["network"], st_meta["name"])]
add_data_to_project(
session,
[sac_path(ev_idx, st_idx)],
DataType.SAC,
event_id=db_event.id,
station_id=db_station.id,
)
# 6. Set the event with the most seismograms as the default
events = session.exec(select(AimbatEvent)).all()
default = max(events, key=lambda e: len(e.seismograms))
set_default_event(session, default)
# 7. Snapshot the initial state before any processing
create_snapshot(session, default, comment="initial import")
print("Project ready.")
print(f" Events: {len(events)}")
print(f" Stations: {len(stations)}")
print(f" Default event: {default.id} ({len(default.seismograms)} seismograms)")
Querying the Database
Models can be queried directly using SQLModel's select:
from sqlmodel import Session, select
from aimbat.db import engine
from aimbat.models import AimbatEvent, AimbatSeismogram
with Session(engine) as session:
events = session.exec(select(AimbatEvent)).all()
for event in events:
print(f"{event.time} {len(event.seismograms)} seismograms")
# Filter — seismograms marked as selected
selected = session.exec(
select(AimbatSeismogram).where(
AimbatSeismogram.parameters.has(select=True) # type: ignore[attr-defined]
)
).all()
Deduplicating Events
add_data_to_project deduplicates stations automatically by SEED code
(network, name, location, channel), so importing the same station from
multiple sources never creates duplicate records.
Events are a different story: they are deduplicated by exact origin time.
When two data sources report the same earthquake with times that differ by a
second or two, they are stored as separate AimbatEvent records. The script
below detects such near-duplicates, merges their seismograms into the record
with the most data, averages the location and depth, and removes the extras.
"""
Deduplicate events that were imported from sources reporting slightly different
origin times for the same earthquake.
Background
----------
``add_data_to_project`` deduplicates stations by SEED code
``(network, name, location, channel)`` — so importing the same station twice,
even with different coordinates, always reuses the existing record. Station
duplicates therefore cannot arise through the normal import path.
Events are deduplicated by exact origin time. When two data sources report
the same earthquake with origin times that differ by a second or two, they are
stored as *separate* ``AimbatEvent`` records. This script finds such
near-duplicate events, merges their seismograms into the canonical record
(the one with the most seismograms), averages the location and depth, then
removes the duplicates.
Run this script *before* starting any processing, and take a snapshot
afterwards so the clean state is recoverable.
"""
from pandas import Timedelta
from sqlmodel import Session, select
from aimbat.db import engine
from aimbat.models import AimbatEvent
# Merge events whose origin times differ by less than this value.
TIME_TOLERANCE = Timedelta(seconds=10)
def _mean(values: list[float]) -> float:
return sum(values) / len(values)
def _mean_opt(values: list[float | None]) -> float | None:
clean = [v for v in values if v is not None]
return sum(clean) / len(clean) if clean else None
def deduplicate_events(session: Session, tolerance: Timedelta = TIME_TOLERANCE) -> int:
"""Merge event records whose origin times are within *tolerance*.
Events are sorted by time and clustered greedily: a new cluster begins
whenever the gap to the previous event exceeds *tolerance*.
For each cluster the record with the most seismograms is kept as the
canonical entry; its location and depth are updated to the group mean.
Returns the number of duplicate records removed.
"""
events = sorted(
session.exec(select(AimbatEvent)).all(),
key=lambda e: e.time,
)
# Build clusters of near-simultaneous events.
clusters: list[list[AimbatEvent]] = []
for event in events:
if clusters and event.time - clusters[-1][-1].time <= tolerance:
clusters[-1].append(event)
else:
clusters.append([event])
removed = 0
for cluster in clusters:
if len(cluster) < 2:
continue
canonical = max(cluster, key=lambda e: len(e.seismograms))
duplicates = [e for e in cluster if e.id != canonical.id]
# Set location / depth to the group mean.
canonical.latitude = _mean([e.latitude for e in cluster])
canonical.longitude = _mean([e.longitude for e in cluster])
canonical.depth = _mean_opt([e.depth for e in cluster])
for dup in duplicates:
for seis in list(dup.seismograms):
seis.event_id = canonical.id
session.add(seis)
session.flush() # apply FK changes before deleting the row
session.delete(dup)
removed += 1
session.add(canonical)
session.commit()
return removed
with Session(engine) as session:
n = deduplicate_events(session)
print(f"Removed {n} duplicate event(s).")
Running Alignment
from sqlmodel import Session
from aimbat.db import engine
from aimbat.core import (
create_iccs_instance,
create_snapshot,
get_default_event,
run_iccs,
run_mccc,
)
with Session(engine) as session:
event = get_default_event(session)
assert event is not None
bound = create_iccs_instance(session, event)
run_iccs(session, bound.iccs, autoflip=True, autoselect=True)
create_snapshot(session, event, comment="after ICCS")
run_mccc(session, event, bound.iccs, all_seismograms=False)
create_snapshot(session, event, comment="after MCCC")