mirror of
https://github.com/zebrajr/ArchiveBox.git
synced 2025-12-07 00:21:11 +01:00
This is great when importing from RSS or other rich formats like JSON that have link added timestamps in the past so the archivebox interface can display the source imported times instead of a large batch all with the same added date.
162 lines
5.2 KiB
Python
162 lines
5.2 KiB
Python
__package__ = 'archivebox.index'
|
|
|
|
import re
|
|
|
|
from datetime import datetime, timezone
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Iterator
|
|
from django.db.models import QuerySet
|
|
from django.db import transaction
|
|
|
|
from .schema import Link
|
|
from ..util import enforce_types, parse_date
|
|
from ..config import (
|
|
OUTPUT_DIR,
|
|
TAG_SEPARATOR_PATTERN,
|
|
USE_TIMESTAMP_AS_ADDED,
|
|
)
|
|
|
|
|
|
### Main Links Index
|
|
|
|
@enforce_types
|
|
def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
|
|
from core.models import Snapshot
|
|
|
|
return (
|
|
Link.from_json(page.as_json(*Snapshot.keys))
|
|
for page in Snapshot.objects.all()
|
|
)
|
|
|
|
@enforce_types
|
|
def remove_from_sql_main_index(snapshots: QuerySet, atomic: bool=False, out_dir: Path=OUTPUT_DIR) -> None:
|
|
if atomic:
|
|
with transaction.atomic():
|
|
return snapshots.delete()
|
|
return snapshots.delete()
|
|
|
|
@enforce_types
|
|
def write_link_to_sql_index(link: Link):
|
|
from core.models import Snapshot, ArchiveResult
|
|
info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
|
|
|
|
tag_list = list(dict.fromkeys(
|
|
tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')
|
|
))
|
|
info.pop('tags')
|
|
|
|
if USE_TIMESTAMP_AS_ADDED:
|
|
try:
|
|
info["added"] = datetime.fromtimestamp(float(info["timestamp"]), tz=timezone.utc)
|
|
except Exception:
|
|
# Silently skip if timestamp isn't parseable as a float and datetime.datetime
|
|
pass
|
|
|
|
try:
|
|
info["timestamp"] = Snapshot.objects.get(url=link.url).timestamp
|
|
except Snapshot.DoesNotExist:
|
|
while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
|
|
info["timestamp"] = str(float(info["timestamp"]) + 1.0)
|
|
|
|
snapshot, _ = Snapshot.objects.update_or_create(url=link.url, defaults=info)
|
|
snapshot.save_tags(tag_list)
|
|
|
|
for extractor, entries in link.history.items():
|
|
for entry in entries:
|
|
if isinstance(entry, dict):
|
|
result, _ = ArchiveResult.objects.get_or_create(
|
|
snapshot_id=snapshot.id,
|
|
extractor=extractor,
|
|
start_ts=parse_date(entry['start_ts']),
|
|
defaults={
|
|
'end_ts': parse_date(entry['end_ts']),
|
|
'cmd': entry['cmd'],
|
|
'output': entry['output'],
|
|
'cmd_version': entry.get('cmd_version') or 'unknown',
|
|
'pwd': entry['pwd'],
|
|
'status': entry['status'],
|
|
}
|
|
)
|
|
else:
|
|
result, _ = ArchiveResult.objects.update_or_create(
|
|
snapshot_id=snapshot.id,
|
|
extractor=extractor,
|
|
start_ts=parse_date(entry.start_ts),
|
|
defaults={
|
|
'end_ts': parse_date(entry.end_ts),
|
|
'cmd': entry.cmd,
|
|
'output': entry.output,
|
|
'cmd_version': entry.cmd_version or 'unknown',
|
|
'pwd': entry.pwd,
|
|
'status': entry.status,
|
|
}
|
|
)
|
|
|
|
return snapshot
|
|
|
|
|
|
@enforce_types
|
|
def write_sql_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR) -> None:
|
|
for link in links:
|
|
# with transaction.atomic():
|
|
# write_link_to_sql_index(link)
|
|
write_link_to_sql_index(link)
|
|
|
|
|
|
@enforce_types
|
|
def write_sql_link_details(link: Link, out_dir: Path=OUTPUT_DIR) -> None:
|
|
from core.models import Snapshot
|
|
|
|
# with transaction.atomic():
|
|
# try:
|
|
# snap = Snapshot.objects.get(url=link.url)
|
|
# except Snapshot.DoesNotExist:
|
|
# snap = write_link_to_sql_index(link)
|
|
# snap.title = link.title
|
|
try:
|
|
snap = Snapshot.objects.get(url=link.url)
|
|
except Snapshot.DoesNotExist:
|
|
snap = write_link_to_sql_index(link)
|
|
snap.title = link.title
|
|
|
|
tag_list = list(dict.fromkeys(
|
|
tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')
|
|
))
|
|
|
|
snap.save()
|
|
snap.save_tags(tag_list)
|
|
|
|
|
|
|
|
@enforce_types
|
|
def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
|
|
from django.core.management import call_command
|
|
out = StringIO()
|
|
call_command("showmigrations", list=True, stdout=out)
|
|
out.seek(0)
|
|
migrations = []
|
|
for line in out.readlines():
|
|
if line.strip() and ']' in line:
|
|
status_str, name_str = line.strip().split(']', 1)
|
|
is_applied = 'X' in status_str
|
|
migration_name = name_str.strip()
|
|
migrations.append((is_applied, migration_name))
|
|
|
|
return migrations
|
|
|
|
@enforce_types
|
|
def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
|
|
from django.core.management import call_command
|
|
null, out = StringIO(), StringIO()
|
|
call_command("makemigrations", interactive=False, stdout=null)
|
|
call_command("migrate", interactive=False, stdout=out)
|
|
out.seek(0)
|
|
|
|
return [line.strip() for line in out.readlines() if line.strip()]
|
|
|
|
@enforce_types
|
|
def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
|
|
from django.contrib.auth.models import User
|
|
return User.objects.filter(is_superuser=True)
|