mirror of
https://github.com/zebrajr/ollama-webui.git
synced 2025-12-06 12:19:46 +01:00
* feat: improve ollama model management experience
This commit introduces several improvements to the Ollama model management modal:
- Adds a cancel button to the model pulling operation, using the existing 'x' button pattern.
- Adds a cancel button to the "Update All" models operation, allowing the user to cancel the update for the currently processing model.
- Cleans up toast notifications when updating all models. A single toast is now shown at the beginning and a summary toast at the end, preventing notification spam.
- Refactors the `ManageOllama.svelte` component to support these new cancellation features.
- Adds tooltips to all buttons in the modal to improve clarity.
- Disables buttons when their corresponding input fields are empty to prevent accidental clicks.
* fix
* i18n: improve Chinese translation
* fix: handle non‑UTF8 chars in third‑party responses without error
* German translation of new strings in i18n
* log web search queries only with level 'debug' instead of 'info'
* Tool calls now only include text and dont inlcude other content like image b64
* fix onedrive
* fix: discovery url
* fix: default permissions not being loaded
* fix: ai hallucination
* fix: non rich text input copy
* refac: rm print statements
* refac: disable direct models from model editors
* refac/fix: do not process xlsx files with azure doc intelligence
* Update pull_request_template.md
* Update generated image translation in DE-de
* added missing danish translations
* feat(onedrive): Enable search and "My Organization" pivot
* style(onedrive): Formatting fix
* feat: Implement toggling for vertical and horizontal flow layouts
This commit introduces the necessary logic and UI controls to allow users to switch the Flow component layout between vertical and horizontal orientations.
* **`Flow.svelte` Refactoring:**
* Updates logic for calculating level offsets and node positions to consistently respect the current flow orientation.
* Adds a control panel using `<Controls>` and `<SwitchButton>` components.
* Provides user interface elements to easily switch the flow layout between horizontal and vertical orientations.
* build(deps): bump pydantic from 2.11.7 to 2.11.9 in /backend
Bumps [pydantic](https://github.com/pydantic/pydantic) from 2.11.7 to 2.11.9.
- [Release notes](https://github.com/pydantic/pydantic/releases)
- [Changelog](https://github.com/pydantic/pydantic/blob/v2.11.9/HISTORY.md)
- [Commits](https://github.com/pydantic/pydantic/compare/v2.11.7...v2.11.9)
---
updated-dependencies:
- dependency-name: pydantic
dependency-version: 2.11.9
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <support@github.com>
* build(deps): bump black from 25.1.0 to 25.9.0 in /backend
Bumps [black](https://github.com/psf/black) from 25.1.0 to 25.9.0.
- [Release notes](https://github.com/psf/black/releases)
- [Changelog](https://github.com/psf/black/blob/main/CHANGES.md)
- [Commits](https://github.com/psf/black/compare/25.1.0...25.9.0)
---
updated-dependencies:
- dependency-name: black
dependency-version: 25.9.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* build(deps): bump markdown from 3.8.2 to 3.9 in /backend
Bumps [markdown](https://github.com/Python-Markdown/markdown) from 3.8.2 to 3.9.
- [Release notes](https://github.com/Python-Markdown/markdown/releases)
- [Changelog](https://github.com/Python-Markdown/markdown/blob/master/docs/changelog.md)
- [Commits](https://github.com/Python-Markdown/markdown/compare/3.8.2...3.9.0)
---
updated-dependencies:
- dependency-name: markdown
dependency-version: '3.9'
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* build(deps): bump chromadb from 1.0.20 to 1.1.0 in /backend
Bumps [chromadb](https://github.com/chroma-core/chroma) from 1.0.20 to 1.1.0.
- [Release notes](https://github.com/chroma-core/chroma/releases)
- [Changelog](https://github.com/chroma-core/chroma/blob/main/RELEASE_PROCESS.md)
- [Commits](https://github.com/chroma-core/chroma/compare/1.0.20...1.1.0)
---
updated-dependencies:
- dependency-name: chromadb
dependency-version: 1.1.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* build(deps): bump opentelemetry-api from 1.36.0 to 1.37.0
Bumps [opentelemetry-api](https://github.com/open-telemetry/opentelemetry-python) from 1.36.0 to 1.37.0.
- [Release notes](https://github.com/open-telemetry/opentelemetry-python/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-python/blob/main/CHANGELOG.md)
- [Commits](https://github.com/open-telemetry/opentelemetry-python/compare/v1.36.0...v1.37.0)
---
updated-dependencies:
- dependency-name: opentelemetry-api
dependency-version: 1.37.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* refac: ollama embed form data
* fix: non rich text handling
* fix: oauth client registration
* refac
* chore: dep bump
* chore: fastapi bump
* chore/refac: bump bcrypt and remove passlib
* Improving Korean Translation
* refac
* Improving Korean Translation
* feat: PWA share_target implementation
Co-Authored-By: gjveld <19951982+gjveld@users.noreply.github.com>
* refac: message input mobile detection behaviour
* feat: model_ids per folder
* Update translation.json (pt-BR)
inclusion of new translations of items that have been added
* refac
* refac
* refac
* refac
* refac/fix: temp chat
* refac
* refac: stop task
* refac/fix: azure audio escape
* refac: external tool validation
* refac/enh: start.sh additional args support
* refac
* refac: styling
* refac/fix: direct connection floating action buttons
* refac/fix: system prompt duplication
* refac/enh: openai tts additional params support
* refac
* feat: load data in parallel to accelerate page loading speed
* i18n: improve Chinese translation
* refac
* refac: model selector
* UPD: i18n es-ES Translation v0.6.33
UPD: i18n es-ES Translation v0.6.33
Updated new strings.
* refac
* improved query pref by querying only relevant columns
* refac/enh: docling params
* refac
* refac: openai additional headers support
* refac
* FEAT: Add Vega Char Visualizer Renderer
### FEAT: Add Vega Char Visualizer Renderer
Feature required in https://github.com/open-webui/open-webui/discussions/18022
Added npm vega lib to package.json
Added function for visualization renderer to src/libs/utils/index.ts
Added logic to src/lib/components/chat/Messages/CodeBlock.svelte
The treatment is similar as for mermaid diagrams.
Reference: https://vega.github.io/vega/
* refac
* chore
* refac
* FEAT: Add Vega-Lite Char Visualizer Renderer
### FEAT: Add Vega Char Visualizer Renderer
Add suport for Vega-Lite Specifications.
Vega-Lite is a "compiled" version of Vega Char Visualizer.
For be rendered with Vega it have to be compiled.
This PR add the check and compile if necessary, is a complement of recent Vega Renderer Feature added.
* refac
* refac/fix: switch
* enh/refac: url input handling
* refac
* refac: styling
* UPD: Add Validators & Error Toast for Mermaid & Vega diagrams
### UPD: Feat: Add Validators & Error Toast for Mermaid & Vega diagrams
Description:
As many time the diagrams generated or entered have syntax errors the diagrams are not rendered due to that errors, but as there isn't any notification is difficult to know what happend.
This PR add validator and toast notification when error on Mermaid and Vega/Vega-Lite diagrams, helping the user to fix its.
* removed redundant knowledge API call
* Fix Code Format
* refac: model workspace view
* refac
* refac: knowledge
* refac: prompts
* refac: tools
* refac
* feat: attach folder
* refac: make tencentcloud-sdk-python optional
* refac/fix: oauth
* enh: ENABLE_OAUTH_EMAIL_FALLBACK
* refac/fix: folders
* Update requirements.txt
* Update pyproject.toml
* UPD: Add Validators & Error Toast for Mermaid & Vega diagrams
### UPD: Feat: Add Validators & Error Toast for Mermaid & Vega diagrams
Description:
As many time the diagrams generated or entered have syntax errors the diagrams are not rendered due to that errors, but as there isn't any notification is difficult to know what happend.
This PR add validator and toast notification when error on Mermaid and Vega/Vega-Lite diagrams, helping the user to fix its.
Note:
Another possibility of integrating this Graph Visualizer is through its svelte component: https://github.com/vega/svelte-vega/tree/main/packages/svelte-vega
* Removed unused toast import & Code Format
* refac
* refac: external tool server view
* refac
* refac: overview
* refac: styling
* refac
* Update bug_report.yaml
* refac
* refac
* refac
* refac
* refac: oauth client fallback
* Fixed: Cannot handle batch sizes > 1 if no padding token is defined
Fixes Cannot handle batch sizes > 1 if no padding token is defined
For reranker models that do not have this defined in their config by using the eos_token_id if present as pad_token_id.
* refac: fallback to reasoning content
* fix(i18n): corrected typo in Spanish translation for "Reasoning Tags"
Typo fixed in Spanish translation file at line 1240 of `open-webui/src/lib/i18n/locales/es-ES/translation.json`:
- Incorrect: "Eriquetas de Razonamiento"
- Correct: "Etiquetas de Razonamiento"
This improves clarity and consistency in the UI.
* refac/fix: ENABLE_STAR_SESSIONS_MIDDLEWARE
* refac/fix: redirect
* refac
* refac
* refac
* refac: web search error handling
* refac: source parsing
* refac: functions
* refac
* refac/enh: note pdf export
* refac/fix: mcp oauth2.1
* chore: format
* chore: Changelog (#17995)
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* refac
* chore: dep bump
---------
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: silentoplayz <jacwoo21@outlook.com>
Co-authored-by: Shirasawa <764798966@qq.com>
Co-authored-by: Jan Kessler <jakessle@uni-mainz.de>
Co-authored-by: Jacob Leksan <jacob.leksan@expedient.com>
Co-authored-by: Classic298 <27028174+Classic298@users.noreply.github.com>
Co-authored-by: sinejespersen <sinejespersen@protonmail.com>
Co-authored-by: Selene Blok <selene.blok@rws.nl>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Cyp <cypher9715@naver.com>
Co-authored-by: gjveld <19951982+gjveld@users.noreply.github.com>
Co-authored-by: joaoback <156559121+joaoback@users.noreply.github.com>
Co-authored-by: _00_ <131402327+rgaricano@users.noreply.github.com>
Co-authored-by: expruc <eygabi01@gmail.com>
Co-authored-by: YetheSamartaka <55753928+YetheSamartaka@users.noreply.github.com>
Co-authored-by: Akutangulo <akutangulo@gmail.com>
787 lines
23 KiB
Python
787 lines
23 KiB
Python
import asyncio
|
|
import random
|
|
|
|
import socketio
|
|
import logging
|
|
import sys
|
|
import time
|
|
from typing import Dict, Set
|
|
from redis import asyncio as aioredis
|
|
import pycrdt as Y
|
|
|
|
from open_webui.models.users import Users, UserNameResponse
|
|
from open_webui.models.channels import Channels
|
|
from open_webui.models.chats import Chats
|
|
from open_webui.models.notes import Notes, NoteUpdateForm
|
|
from open_webui.utils.redis import (
|
|
get_sentinels_from_env,
|
|
get_sentinel_url_from_env,
|
|
)
|
|
|
|
from open_webui.env import (
|
|
ENABLE_WEBSOCKET_SUPPORT,
|
|
WEBSOCKET_MANAGER,
|
|
WEBSOCKET_REDIS_URL,
|
|
WEBSOCKET_REDIS_CLUSTER,
|
|
WEBSOCKET_REDIS_LOCK_TIMEOUT,
|
|
WEBSOCKET_SENTINEL_PORT,
|
|
WEBSOCKET_SENTINEL_HOSTS,
|
|
REDIS_KEY_PREFIX,
|
|
)
|
|
from open_webui.utils.auth import decode_token
|
|
from open_webui.socket.utils import RedisDict, RedisLock, YdocManager
|
|
from open_webui.tasks import create_task, stop_item_tasks
|
|
from open_webui.utils.redis import get_redis_connection
|
|
from open_webui.utils.access_control import has_access, get_users_with_access
|
|
|
|
|
|
from open_webui.env import (
|
|
GLOBAL_LOG_LEVEL,
|
|
SRC_LOG_LEVELS,
|
|
)
|
|
|
|
|
|
logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)
|
|
log = logging.getLogger(__name__)
|
|
log.setLevel(SRC_LOG_LEVELS["SOCKET"])
|
|
|
|
|
|
REDIS = None
|
|
|
|
if WEBSOCKET_MANAGER == "redis":
|
|
if WEBSOCKET_SENTINEL_HOSTS:
|
|
mgr = socketio.AsyncRedisManager(
|
|
get_sentinel_url_from_env(
|
|
WEBSOCKET_REDIS_URL, WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT
|
|
)
|
|
)
|
|
else:
|
|
mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL)
|
|
sio = socketio.AsyncServer(
|
|
cors_allowed_origins=[],
|
|
async_mode="asgi",
|
|
transports=(["websocket"] if ENABLE_WEBSOCKET_SUPPORT else ["polling"]),
|
|
allow_upgrades=ENABLE_WEBSOCKET_SUPPORT,
|
|
always_connect=True,
|
|
client_manager=mgr,
|
|
)
|
|
else:
|
|
sio = socketio.AsyncServer(
|
|
cors_allowed_origins=[],
|
|
async_mode="asgi",
|
|
transports=(["websocket"] if ENABLE_WEBSOCKET_SUPPORT else ["polling"]),
|
|
allow_upgrades=ENABLE_WEBSOCKET_SUPPORT,
|
|
always_connect=True,
|
|
)
|
|
|
|
|
|
# Timeout duration in seconds
|
|
TIMEOUT_DURATION = 3
|
|
|
|
# Dictionary to maintain the user pool
|
|
|
|
if WEBSOCKET_MANAGER == "redis":
|
|
log.debug("Using Redis to manage websockets.")
|
|
REDIS = get_redis_connection(
|
|
redis_url=WEBSOCKET_REDIS_URL,
|
|
redis_sentinels=get_sentinels_from_env(
|
|
WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT
|
|
),
|
|
redis_cluster=WEBSOCKET_REDIS_CLUSTER,
|
|
async_mode=True,
|
|
)
|
|
|
|
redis_sentinels = get_sentinels_from_env(
|
|
WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT
|
|
)
|
|
SESSION_POOL = RedisDict(
|
|
f"{REDIS_KEY_PREFIX}:session_pool",
|
|
redis_url=WEBSOCKET_REDIS_URL,
|
|
redis_sentinels=redis_sentinels,
|
|
redis_cluster=WEBSOCKET_REDIS_CLUSTER,
|
|
)
|
|
USER_POOL = RedisDict(
|
|
f"{REDIS_KEY_PREFIX}:user_pool",
|
|
redis_url=WEBSOCKET_REDIS_URL,
|
|
redis_sentinels=redis_sentinels,
|
|
redis_cluster=WEBSOCKET_REDIS_CLUSTER,
|
|
)
|
|
USAGE_POOL = RedisDict(
|
|
f"{REDIS_KEY_PREFIX}:usage_pool",
|
|
redis_url=WEBSOCKET_REDIS_URL,
|
|
redis_sentinels=redis_sentinels,
|
|
redis_cluster=WEBSOCKET_REDIS_CLUSTER,
|
|
)
|
|
|
|
clean_up_lock = RedisLock(
|
|
redis_url=WEBSOCKET_REDIS_URL,
|
|
lock_name=f"{REDIS_KEY_PREFIX}:usage_cleanup_lock",
|
|
timeout_secs=WEBSOCKET_REDIS_LOCK_TIMEOUT,
|
|
redis_sentinels=redis_sentinels,
|
|
redis_cluster=WEBSOCKET_REDIS_CLUSTER,
|
|
)
|
|
aquire_func = clean_up_lock.aquire_lock
|
|
renew_func = clean_up_lock.renew_lock
|
|
release_func = clean_up_lock.release_lock
|
|
else:
|
|
SESSION_POOL = {}
|
|
USER_POOL = {}
|
|
USAGE_POOL = {}
|
|
|
|
aquire_func = release_func = renew_func = lambda: True
|
|
|
|
|
|
YDOC_MANAGER = YdocManager(
|
|
redis=REDIS,
|
|
redis_key_prefix=f"{REDIS_KEY_PREFIX}:ydoc:documents",
|
|
)
|
|
|
|
|
|
async def periodic_usage_pool_cleanup():
|
|
max_retries = 2
|
|
retry_delay = random.uniform(
|
|
WEBSOCKET_REDIS_LOCK_TIMEOUT / 2, WEBSOCKET_REDIS_LOCK_TIMEOUT
|
|
)
|
|
for attempt in range(max_retries + 1):
|
|
if aquire_func():
|
|
break
|
|
else:
|
|
if attempt < max_retries:
|
|
log.debug(
|
|
f"Cleanup lock already exists. Retry {attempt + 1} after {retry_delay}s..."
|
|
)
|
|
await asyncio.sleep(retry_delay)
|
|
else:
|
|
log.warning(
|
|
"Failed to acquire cleanup lock after retries. Skipping cleanup."
|
|
)
|
|
return
|
|
|
|
log.debug("Running periodic_cleanup")
|
|
try:
|
|
while True:
|
|
if not renew_func():
|
|
log.error(f"Unable to renew cleanup lock. Exiting usage pool cleanup.")
|
|
raise Exception("Unable to renew usage pool cleanup lock.")
|
|
|
|
now = int(time.time())
|
|
send_usage = False
|
|
for model_id, connections in list(USAGE_POOL.items()):
|
|
# Creating a list of sids to remove if they have timed out
|
|
expired_sids = [
|
|
sid
|
|
for sid, details in connections.items()
|
|
if now - details["updated_at"] > TIMEOUT_DURATION
|
|
]
|
|
|
|
for sid in expired_sids:
|
|
del connections[sid]
|
|
|
|
if not connections:
|
|
log.debug(f"Cleaning up model {model_id} from usage pool")
|
|
del USAGE_POOL[model_id]
|
|
else:
|
|
USAGE_POOL[model_id] = connections
|
|
|
|
send_usage = True
|
|
await asyncio.sleep(TIMEOUT_DURATION)
|
|
finally:
|
|
release_func()
|
|
|
|
|
|
app = socketio.ASGIApp(
|
|
sio,
|
|
socketio_path="/ws/socket.io",
|
|
)
|
|
|
|
|
|
def get_models_in_use():
|
|
# List models that are currently in use
|
|
models_in_use = list(USAGE_POOL.keys())
|
|
return models_in_use
|
|
|
|
|
|
def get_active_user_ids():
|
|
"""Get the list of active user IDs."""
|
|
return list(USER_POOL.keys())
|
|
|
|
|
|
def get_user_active_status(user_id):
|
|
"""Check if a user is currently active."""
|
|
return user_id in USER_POOL
|
|
|
|
|
|
def get_user_id_from_session_pool(sid):
|
|
user = SESSION_POOL.get(sid)
|
|
if user:
|
|
return user["id"]
|
|
return None
|
|
|
|
|
|
def get_session_ids_from_room(room):
|
|
"""Get all session IDs from a specific room."""
|
|
active_session_ids = sio.manager.get_participants(
|
|
namespace="/",
|
|
room=room,
|
|
)
|
|
return [session_id[0] for session_id in active_session_ids]
|
|
|
|
|
|
def get_user_ids_from_room(room):
|
|
active_session_ids = get_session_ids_from_room(room)
|
|
|
|
active_user_ids = list(
|
|
set([SESSION_POOL.get(session_id)["id"] for session_id in active_session_ids])
|
|
)
|
|
return active_user_ids
|
|
|
|
|
|
def get_active_status_by_user_id(user_id):
|
|
if user_id in USER_POOL:
|
|
return True
|
|
return False
|
|
|
|
|
|
@sio.on("usage")
|
|
async def usage(sid, data):
|
|
if sid in SESSION_POOL:
|
|
model_id = data["model"]
|
|
# Record the timestamp for the last update
|
|
current_time = int(time.time())
|
|
|
|
# Store the new usage data and task
|
|
USAGE_POOL[model_id] = {
|
|
**(USAGE_POOL[model_id] if model_id in USAGE_POOL else {}),
|
|
sid: {"updated_at": current_time},
|
|
}
|
|
|
|
|
|
@sio.event
|
|
async def connect(sid, environ, auth):
|
|
user = None
|
|
if auth and "token" in auth:
|
|
data = decode_token(auth["token"])
|
|
|
|
if data is not None and "id" in data:
|
|
user = Users.get_user_by_id(data["id"])
|
|
|
|
if user:
|
|
SESSION_POOL[sid] = user.model_dump(
|
|
exclude=["date_of_birth", "bio", "gender"]
|
|
)
|
|
if user.id in USER_POOL:
|
|
USER_POOL[user.id] = USER_POOL[user.id] + [sid]
|
|
else:
|
|
USER_POOL[user.id] = [sid]
|
|
|
|
|
|
@sio.on("user-join")
|
|
async def user_join(sid, data):
|
|
|
|
auth = data["auth"] if "auth" in data else None
|
|
if not auth or "token" not in auth:
|
|
return
|
|
|
|
data = decode_token(auth["token"])
|
|
if data is None or "id" not in data:
|
|
return
|
|
|
|
user = Users.get_user_by_id(data["id"])
|
|
if not user:
|
|
return
|
|
|
|
SESSION_POOL[sid] = user.model_dump(exclude=["date_of_birth", "bio", "gender"])
|
|
if user.id in USER_POOL:
|
|
USER_POOL[user.id] = USER_POOL[user.id] + [sid]
|
|
else:
|
|
USER_POOL[user.id] = [sid]
|
|
|
|
# Join all the channels
|
|
channels = Channels.get_channels_by_user_id(user.id)
|
|
log.debug(f"{channels=}")
|
|
for channel in channels:
|
|
await sio.enter_room(sid, f"channel:{channel.id}")
|
|
return {"id": user.id, "name": user.name}
|
|
|
|
|
|
@sio.on("join-channels")
|
|
async def join_channel(sid, data):
|
|
auth = data["auth"] if "auth" in data else None
|
|
if not auth or "token" not in auth:
|
|
return
|
|
|
|
data = decode_token(auth["token"])
|
|
if data is None or "id" not in data:
|
|
return
|
|
|
|
user = Users.get_user_by_id(data["id"])
|
|
if not user:
|
|
return
|
|
|
|
# Join all the channels
|
|
channels = Channels.get_channels_by_user_id(user.id)
|
|
log.debug(f"{channels=}")
|
|
for channel in channels:
|
|
await sio.enter_room(sid, f"channel:{channel.id}")
|
|
|
|
|
|
@sio.on("join-note")
|
|
async def join_note(sid, data):
|
|
auth = data["auth"] if "auth" in data else None
|
|
if not auth or "token" not in auth:
|
|
return
|
|
|
|
token_data = decode_token(auth["token"])
|
|
if token_data is None or "id" not in token_data:
|
|
return
|
|
|
|
user = Users.get_user_by_id(token_data["id"])
|
|
if not user:
|
|
return
|
|
|
|
note = Notes.get_note_by_id(data["note_id"])
|
|
if not note:
|
|
log.error(f"Note {data['note_id']} not found for user {user.id}")
|
|
return
|
|
|
|
if (
|
|
user.role != "admin"
|
|
and user.id != note.user_id
|
|
and not has_access(user.id, type="read", access_control=note.access_control)
|
|
):
|
|
log.error(f"User {user.id} does not have access to note {data['note_id']}")
|
|
return
|
|
|
|
log.debug(f"Joining note {note.id} for user {user.id}")
|
|
await sio.enter_room(sid, f"note:{note.id}")
|
|
|
|
|
|
@sio.on("events:channel")
|
|
async def channel_events(sid, data):
|
|
room = f"channel:{data['channel_id']}"
|
|
participants = sio.manager.get_participants(
|
|
namespace="/",
|
|
room=room,
|
|
)
|
|
|
|
sids = [sid for sid, _ in participants]
|
|
if sid not in sids:
|
|
return
|
|
|
|
event_data = data["data"]
|
|
event_type = event_data["type"]
|
|
|
|
if event_type == "typing":
|
|
await sio.emit(
|
|
"events:channel",
|
|
{
|
|
"channel_id": data["channel_id"],
|
|
"message_id": data.get("message_id", None),
|
|
"data": event_data,
|
|
"user": UserNameResponse(**SESSION_POOL[sid]).model_dump(),
|
|
},
|
|
room=room,
|
|
)
|
|
|
|
|
|
@sio.on("ydoc:document:join")
|
|
async def ydoc_document_join(sid, data):
|
|
"""Handle user joining a document"""
|
|
user = SESSION_POOL.get(sid)
|
|
|
|
try:
|
|
document_id = data["document_id"]
|
|
|
|
if document_id.startswith("note:"):
|
|
note_id = document_id.split(":")[1]
|
|
note = Notes.get_note_by_id(note_id)
|
|
if not note:
|
|
log.error(f"Note {note_id} not found")
|
|
return
|
|
|
|
if (
|
|
user.get("role") != "admin"
|
|
and user.get("id") != note.user_id
|
|
and not has_access(
|
|
user.get("id"), type="read", access_control=note.access_control
|
|
)
|
|
):
|
|
log.error(
|
|
f"User {user.get('id')} does not have access to note {note_id}"
|
|
)
|
|
return
|
|
|
|
user_id = data.get("user_id", sid)
|
|
user_name = data.get("user_name", "Anonymous")
|
|
user_color = data.get("user_color", "#000000")
|
|
|
|
log.info(f"User {user_id} joining document {document_id}")
|
|
await YDOC_MANAGER.add_user(document_id=document_id, user_id=sid)
|
|
|
|
# Join Socket.IO room
|
|
await sio.enter_room(sid, f"doc_{document_id}")
|
|
|
|
active_session_ids = get_session_ids_from_room(f"doc_{document_id}")
|
|
|
|
# Get the Yjs document state
|
|
ydoc = Y.Doc()
|
|
updates = await YDOC_MANAGER.get_updates(document_id)
|
|
for update in updates:
|
|
ydoc.apply_update(bytes(update))
|
|
|
|
# Encode the entire document state as an update
|
|
state_update = ydoc.get_update()
|
|
await sio.emit(
|
|
"ydoc:document:state",
|
|
{
|
|
"document_id": document_id,
|
|
"state": list(state_update), # Convert bytes to list for JSON
|
|
"sessions": active_session_ids,
|
|
},
|
|
room=sid,
|
|
)
|
|
|
|
# Notify other users about the new user
|
|
await sio.emit(
|
|
"ydoc:user:joined",
|
|
{
|
|
"document_id": document_id,
|
|
"user_id": user_id,
|
|
"user_name": user_name,
|
|
"user_color": user_color,
|
|
},
|
|
room=f"doc_{document_id}",
|
|
skip_sid=sid,
|
|
)
|
|
|
|
log.info(f"User {user_id} successfully joined document {document_id}")
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in yjs_document_join: {e}")
|
|
await sio.emit("error", {"message": "Failed to join document"}, room=sid)
|
|
|
|
|
|
async def document_save_handler(document_id, data, user):
|
|
if document_id.startswith("note:"):
|
|
note_id = document_id.split(":")[1]
|
|
note = Notes.get_note_by_id(note_id)
|
|
if not note:
|
|
log.error(f"Note {note_id} not found")
|
|
return
|
|
|
|
if (
|
|
user.get("role") != "admin"
|
|
and user.get("id") != note.user_id
|
|
and not has_access(
|
|
user.get("id"), type="read", access_control=note.access_control
|
|
)
|
|
):
|
|
log.error(f"User {user.get('id')} does not have access to note {note_id}")
|
|
return
|
|
|
|
Notes.update_note_by_id(note_id, NoteUpdateForm(data=data))
|
|
|
|
|
|
@sio.on("ydoc:document:state")
|
|
async def yjs_document_state(sid, data):
|
|
"""Send the current state of the Yjs document to the user"""
|
|
try:
|
|
document_id = data["document_id"]
|
|
room = f"doc_{document_id}"
|
|
|
|
active_session_ids = get_session_ids_from_room(room)
|
|
|
|
if sid not in active_session_ids:
|
|
log.warning(f"Session {sid} not in room {room}. Cannot send state.")
|
|
return
|
|
|
|
if not await YDOC_MANAGER.document_exists(document_id):
|
|
log.warning(f"Document {document_id} not found")
|
|
return
|
|
|
|
# Get the Yjs document state
|
|
ydoc = Y.Doc()
|
|
updates = await YDOC_MANAGER.get_updates(document_id)
|
|
for update in updates:
|
|
ydoc.apply_update(bytes(update))
|
|
|
|
# Encode the entire document state as an update
|
|
state_update = ydoc.get_update()
|
|
|
|
await sio.emit(
|
|
"ydoc:document:state",
|
|
{
|
|
"document_id": document_id,
|
|
"state": list(state_update), # Convert bytes to list for JSON
|
|
"sessions": active_session_ids,
|
|
},
|
|
room=sid,
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Error in yjs_document_state: {e}")
|
|
|
|
|
|
@sio.on("ydoc:document:update")
|
|
async def yjs_document_update(sid, data):
|
|
"""Handle Yjs document updates"""
|
|
try:
|
|
document_id = data["document_id"]
|
|
|
|
try:
|
|
await stop_item_tasks(REDIS, document_id)
|
|
except:
|
|
pass
|
|
|
|
user_id = data.get("user_id", sid)
|
|
|
|
update = data["update"] # List of bytes from frontend
|
|
|
|
await YDOC_MANAGER.append_to_updates(
|
|
document_id=document_id,
|
|
update=update, # Convert list of bytes to bytes
|
|
)
|
|
|
|
# Broadcast update to all other users in the document
|
|
await sio.emit(
|
|
"ydoc:document:update",
|
|
{
|
|
"document_id": document_id,
|
|
"user_id": user_id,
|
|
"update": update,
|
|
"socket_id": sid, # Add socket_id to match frontend filtering
|
|
},
|
|
room=f"doc_{document_id}",
|
|
skip_sid=sid,
|
|
)
|
|
|
|
async def debounced_save():
|
|
await asyncio.sleep(0.5)
|
|
await document_save_handler(
|
|
document_id, data.get("data", {}), SESSION_POOL.get(sid)
|
|
)
|
|
|
|
if data.get("data"):
|
|
await create_task(REDIS, debounced_save(), document_id)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in yjs_document_update: {e}")
|
|
|
|
|
|
@sio.on("ydoc:document:leave")
|
|
async def yjs_document_leave(sid, data):
|
|
"""Handle user leaving a document"""
|
|
try:
|
|
document_id = data["document_id"]
|
|
user_id = data.get("user_id", sid)
|
|
|
|
log.info(f"User {user_id} leaving document {document_id}")
|
|
|
|
# Remove user from the document
|
|
await YDOC_MANAGER.remove_user(document_id=document_id, user_id=sid)
|
|
|
|
# Leave Socket.IO room
|
|
await sio.leave_room(sid, f"doc_{document_id}")
|
|
|
|
# Notify other users
|
|
await sio.emit(
|
|
"ydoc:user:left",
|
|
{"document_id": document_id, "user_id": user_id},
|
|
room=f"doc_{document_id}",
|
|
)
|
|
|
|
if (
|
|
await YDOC_MANAGER.document_exists(document_id)
|
|
and len(await YDOC_MANAGER.get_users(document_id)) == 0
|
|
):
|
|
log.info(f"Cleaning up document {document_id} as no users are left")
|
|
await YDOC_MANAGER.clear_document(document_id)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in yjs_document_leave: {e}")
|
|
|
|
|
|
@sio.on("ydoc:awareness:update")
|
|
async def yjs_awareness_update(sid, data):
|
|
"""Handle awareness updates (cursors, selections, etc.)"""
|
|
try:
|
|
document_id = data["document_id"]
|
|
user_id = data.get("user_id", sid)
|
|
update = data["update"]
|
|
|
|
# Broadcast awareness update to all other users in the document
|
|
await sio.emit(
|
|
"ydoc:awareness:update",
|
|
{"document_id": document_id, "user_id": user_id, "update": update},
|
|
room=f"doc_{document_id}",
|
|
skip_sid=sid,
|
|
)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in yjs_awareness_update: {e}")
|
|
|
|
|
|
@sio.event
|
|
async def disconnect(sid):
|
|
if sid in SESSION_POOL:
|
|
user = SESSION_POOL[sid]
|
|
del SESSION_POOL[sid]
|
|
|
|
user_id = user["id"]
|
|
USER_POOL[user_id] = [_sid for _sid in USER_POOL[user_id] if _sid != sid]
|
|
|
|
if len(USER_POOL[user_id]) == 0:
|
|
del USER_POOL[user_id]
|
|
|
|
await YDOC_MANAGER.remove_user_from_all_documents(sid)
|
|
else:
|
|
pass
|
|
# print(f"Unknown session ID {sid} disconnected")
|
|
|
|
|
|
def get_event_emitter(request_info, update_db=True):
|
|
async def __event_emitter__(event_data):
|
|
user_id = request_info["user_id"]
|
|
|
|
session_ids = list(
|
|
set(
|
|
USER_POOL.get(user_id, [])
|
|
+ (
|
|
[request_info.get("session_id")]
|
|
if request_info.get("session_id")
|
|
else []
|
|
)
|
|
)
|
|
)
|
|
|
|
chat_id = request_info.get("chat_id", None)
|
|
message_id = request_info.get("message_id", None)
|
|
|
|
emit_tasks = [
|
|
sio.emit(
|
|
"events",
|
|
{
|
|
"chat_id": chat_id,
|
|
"message_id": message_id,
|
|
"data": event_data,
|
|
},
|
|
to=session_id,
|
|
)
|
|
for session_id in session_ids
|
|
]
|
|
|
|
await asyncio.gather(*emit_tasks)
|
|
if (
|
|
update_db
|
|
and message_id
|
|
and not request_info.get("chat_id", "").startswith("local:")
|
|
):
|
|
if "type" in event_data and event_data["type"] == "status":
|
|
Chats.add_message_status_to_chat_by_id_and_message_id(
|
|
request_info["chat_id"],
|
|
request_info["message_id"],
|
|
event_data.get("data", {}),
|
|
)
|
|
|
|
if "type" in event_data and event_data["type"] == "message":
|
|
message = Chats.get_message_by_id_and_message_id(
|
|
request_info["chat_id"],
|
|
request_info["message_id"],
|
|
)
|
|
|
|
if message:
|
|
content = message.get("content", "")
|
|
content += event_data.get("data", {}).get("content", "")
|
|
|
|
Chats.upsert_message_to_chat_by_id_and_message_id(
|
|
request_info["chat_id"],
|
|
request_info["message_id"],
|
|
{
|
|
"content": content,
|
|
},
|
|
)
|
|
|
|
if "type" in event_data and event_data["type"] == "replace":
|
|
content = event_data.get("data", {}).get("content", "")
|
|
|
|
Chats.upsert_message_to_chat_by_id_and_message_id(
|
|
request_info["chat_id"],
|
|
request_info["message_id"],
|
|
{
|
|
"content": content,
|
|
},
|
|
)
|
|
|
|
if "type" in event_data and event_data["type"] == "embeds":
|
|
message = Chats.get_message_by_id_and_message_id(
|
|
request_info["chat_id"],
|
|
request_info["message_id"],
|
|
)
|
|
|
|
embeds = event_data.get("data", {}).get("embeds", [])
|
|
embeds.extend(message.get("embeds", []))
|
|
|
|
Chats.upsert_message_to_chat_by_id_and_message_id(
|
|
request_info["chat_id"],
|
|
request_info["message_id"],
|
|
{
|
|
"embeds": embeds,
|
|
},
|
|
)
|
|
|
|
if "type" in event_data and event_data["type"] == "files":
|
|
message = Chats.get_message_by_id_and_message_id(
|
|
request_info["chat_id"],
|
|
request_info["message_id"],
|
|
)
|
|
|
|
files = event_data.get("data", {}).get("files", [])
|
|
files.extend(message.get("files", []))
|
|
|
|
Chats.upsert_message_to_chat_by_id_and_message_id(
|
|
request_info["chat_id"],
|
|
request_info["message_id"],
|
|
{
|
|
"files": files,
|
|
},
|
|
)
|
|
|
|
if event_data.get("type") in ["source", "citation"]:
|
|
data = event_data.get("data", {})
|
|
if data.get("type") == None:
|
|
message = Chats.get_message_by_id_and_message_id(
|
|
request_info["chat_id"],
|
|
request_info["message_id"],
|
|
)
|
|
|
|
sources = message.get("sources", [])
|
|
sources.append(data)
|
|
|
|
Chats.upsert_message_to_chat_by_id_and_message_id(
|
|
request_info["chat_id"],
|
|
request_info["message_id"],
|
|
{
|
|
"sources": sources,
|
|
},
|
|
)
|
|
|
|
return __event_emitter__
|
|
|
|
|
|
def get_event_call(request_info):
|
|
async def __event_caller__(event_data):
|
|
response = await sio.call(
|
|
"events",
|
|
{
|
|
"chat_id": request_info.get("chat_id", None),
|
|
"message_id": request_info.get("message_id", None),
|
|
"data": event_data,
|
|
},
|
|
to=request_info["session_id"],
|
|
)
|
|
return response
|
|
|
|
return __event_caller__
|
|
|
|
|
|
get_event_caller = get_event_call
|