feat: partner sync (#16424)

feat: partner CUD sync
This commit is contained in:
Zack Pollard 2025-03-03 11:05:30 +00:00 committed by GitHub
parent 869839f642
commit fe702ba6d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 375 additions and 8 deletions

BIN
mobile/openapi/README.md generated

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -12052,13 +12052,50 @@
"SyncEntityType": {
"enum": [
"UserV1",
"UserDeleteV1"
"UserDeleteV1",
"PartnerV1",
"PartnerDeleteV1"
],
"type": "string"
},
"SyncPartnerDeleteV1": {
"properties": {
"sharedById": {
"type": "string"
},
"sharedWithId": {
"type": "string"
}
},
"required": [
"sharedById",
"sharedWithId"
],
"type": "object"
},
"SyncPartnerV1": {
"properties": {
"inTimeline": {
"type": "boolean"
},
"sharedById": {
"type": "string"
},
"sharedWithId": {
"type": "string"
}
},
"required": [
"inTimeline",
"sharedById",
"sharedWithId"
],
"type": "object"
},
"SyncRequestType": {
"enum": [
"UsersV1"
"UsersV1",
"PartnersV1"
],
"type": "string"
},

View File

@ -3645,10 +3645,13 @@ export enum Error2 {
}
export enum SyncEntityType {
UserV1 = "UserV1",
UserDeleteV1 = "UserDeleteV1"
UserDeleteV1 = "UserDeleteV1",
PartnerV1 = "PartnerV1",
PartnerDeleteV1 = "PartnerDeleteV1"
}
export enum SyncRequestType {
UsersV1 = "UsersV1"
UsersV1 = "UsersV1",
PartnersV1 = "PartnersV1"
}
export enum TranscodeHWAccel {
Nvenc = "nvenc",

9
server/src/db.d.ts vendored
View File

@ -272,6 +272,13 @@ export interface NaturalearthCountries {
type: string;
}
export interface PartnersAudit {
deletedAt: Generated<Timestamp>;
id: Generated<string>;
sharedById: string;
sharedWithId: string;
}
export interface Partners {
createdAt: Generated<Timestamp>;
inTimeline: Generated<boolean>;
@ -316,7 +323,6 @@ export interface SessionSyncCheckpoints {
updateId: Generated<string>;
}
export interface SharedLinkAsset {
assetsId: string;
sharedLinksId: string;
@ -462,6 +468,7 @@ export interface DB {
migrations: Migrations;
move_history: MoveHistory;
naturalearth_countries: NaturalearthCountries;
partners_audit: PartnersAudit;
partners: Partners;
person: Person;
sessions: Sessions;

View File

@ -45,15 +45,30 @@ export class SyncUserDeleteV1 {
userId!: string;
}
export class SyncPartnerV1 {
sharedById!: string;
sharedWithId!: string;
inTimeline!: boolean;
}
export class SyncPartnerDeleteV1 {
sharedById!: string;
sharedWithId!: string;
}
export type SyncItem = {
[SyncEntityType.UserV1]: SyncUserV1;
[SyncEntityType.UserDeleteV1]: SyncUserDeleteV1;
[SyncEntityType.PartnerV1]: SyncPartnerV1;
[SyncEntityType.PartnerDeleteV1]: SyncPartnerDeleteV1;
};
const responseDtos = [
//
SyncUserV1,
SyncUserDeleteV1,
SyncPartnerV1,
SyncPartnerDeleteV1,
];
export const extraSyncModels = responseDtos;

View File

@ -0,0 +1,19 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryColumn } from 'typeorm';
@Entity('partners_audit')
export class PartnerAuditEntity {
@PrimaryColumn({ type: 'uuid', nullable: false, default: () => 'immich_uuid_v7()' })
id!: string;
@Index('IDX_partners_audit_shared_by_id')
@Column({ type: 'uuid' })
sharedById!: string;
@Index('IDX_partners_audit_shared_with_id')
@Column({ type: 'uuid' })
sharedWithId!: string;
@Index('IDX_partners_audit_deleted_at')
@CreateDateColumn({ type: 'timestamptz', default: () => 'clock_timestamp()' })
deletedAt!: Date;
}

View File

@ -548,9 +548,12 @@ export enum DatabaseLock {
export enum SyncRequestType {
UsersV1 = 'UsersV1',
PartnersV1 = 'PartnersV1',
}
export enum SyncEntityType {
UserV1 = 'UserV1',
UserDeleteV1 = 'UserDeleteV1',
PartnerV1 = 'PartnerV1',
PartnerDeleteV1 = 'PartnerDeleteV1',
}

View File

@ -0,0 +1,38 @@
import { MigrationInterface, QueryRunner } from "typeorm";
export class CreatePartnersAuditTable1740739778549 implements MigrationInterface {
name = 'CreatePartnersAuditTable1740739778549'
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE TABLE "partners_audit" ("id" uuid NOT NULL DEFAULT immich_uuid_v7(), "sharedById" uuid NOT NULL, "sharedWithId" uuid NOT NULL, "deletedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp(), CONSTRAINT "PK_952b50217ff78198a7e380f0359" PRIMARY KEY ("id"))`);
await queryRunner.query(`CREATE INDEX "IDX_partners_audit_shared_by_id" ON "partners_audit" ("sharedById") `);
await queryRunner.query(`CREATE INDEX "IDX_partners_audit_shared_with_id" ON "partners_audit" ("sharedWithId") `);
await queryRunner.query(`CREATE INDEX "IDX_partners_audit_deleted_at" ON "partners_audit" ("deletedAt") `);
await queryRunner.query(`CREATE OR REPLACE FUNCTION partners_delete_audit() RETURNS TRIGGER AS
$$
BEGIN
INSERT INTO partners_audit ("sharedById", "sharedWithId")
SELECT "sharedById", "sharedWithId"
FROM OLD;
RETURN NULL;
END;
$$ LANGUAGE plpgsql`
);
await queryRunner.query(`CREATE OR REPLACE TRIGGER partners_delete_audit
AFTER DELETE ON partners
REFERENCING OLD TABLE AS OLD
FOR EACH STATEMENT
EXECUTE FUNCTION partners_delete_audit();
`);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "public"."IDX_partners_audit_deleted_at"`);
await queryRunner.query(`DROP INDEX "public"."IDX_partners_audit_shared_with_id"`);
await queryRunner.query(`DROP INDEX "public"."IDX_partners_audit_shared_by_id"`);
await queryRunner.query(`DROP TRIGGER partners_delete_audit`);
await queryRunner.query(`DROP FUNCTION partners_delete_audit`);
await queryRunner.query(`DROP TABLE "partners_audit"`);
}
}

View File

@ -56,4 +56,26 @@ export class SyncRepository {
.orderBy(['id asc'])
.stream();
}
getPartnerUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('partners')
.select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId'])
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['updateId asc'])
.stream();
}
getPartnerDeletes(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('partners_audit')
.select(['id', 'sharedById', 'sharedWithId'])
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['id asc'])
.stream();
}
}

View File

@ -25,6 +25,7 @@ const FULL_SYNC = { needsFullSync: true, deleted: [], upserted: [] };
const SYNC_TYPES_ORDER = [
//
SyncRequestType.UsersV1,
SyncRequestType.PartnersV1,
];
const throwSessionRequired = () => {
@ -81,8 +82,6 @@ export class SyncService extends BaseService {
checkpoints.map(({ type, ack }) => [type, fromAck(ack)]),
);
// TODO pre-filter/sort list based on optimal sync order
for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) {
switch (type) {
case SyncRequestType.UsersV1: {
@ -99,6 +98,23 @@ export class SyncService extends BaseService {
break;
}
case SyncRequestType.PartnersV1: {
const deletes = this.syncRepository.getPartnerDeletes(
auth.user.id,
checkpointMap[SyncEntityType.PartnerDeleteV1],
);
for await (const { id, ...data } of deletes) {
response.write(serialize({ type: SyncEntityType.PartnerDeleteV1, updateId: id, data }));
}
const upserts = this.syncRepository.getPartnerUpserts(auth.user.id, checkpointMap[SyncEntityType.PartnerV1]);
for await (const { updateId, ...data } of upserts) {
response.write(serialize({ type: SyncEntityType.PartnerV1, updateId, data }));
}
break;
}
default: {
this.logger.warn(`Unsupported sync type: ${type}`);
break;

View File

@ -1,11 +1,12 @@
import { Insertable, Kysely } from 'kysely';
import { randomBytes, randomUUID } from 'node:crypto';
import { Writable } from 'node:stream';
import { Assets, DB, Sessions, Users } from 'src/db';
import { Assets, DB, Partners, Sessions, Users } from 'src/db';
import { AuthDto } from 'src/dtos/auth.dto';
import { AssetType } from 'src/enum';
import { AlbumRepository } from 'src/repositories/album.repository';
import { AssetRepository } from 'src/repositories/asset.repository';
import { PartnerRepository } from 'src/repositories/partner.repository';
import { SessionRepository } from 'src/repositories/session.repository';
import { SyncRepository } from 'src/repositories/sync.repository';
import { UserRepository } from 'src/repositories/user.repository';
@ -30,6 +31,7 @@ class CustomWritable extends Writable {
type Asset = Insertable<Assets>;
type User = Partial<Insertable<Users>>;
type Session = Omit<Insertable<Sessions>, 'token'> & { token?: string };
type Partner = Insertable<Partners>;
export const newUuid = () => randomUUID() as string;
@ -37,6 +39,7 @@ export class TestFactory {
private assets: Asset[] = [];
private sessions: Session[] = [];
private users: User[] = [];
private partners: Partner[] = [];
private constructor(private context: TestContext) {}
@ -100,6 +103,17 @@ export class TestFactory {
};
}
static partner(partner: Partner) {
const defaults = {
inTimeline: true,
};
return {
...defaults,
...partner,
};
}
withAsset(asset: Asset) {
this.assets.push(asset);
return this;
@ -115,6 +129,11 @@ export class TestFactory {
return this;
}
withPartner(partner: Partner) {
this.partners.push(partner);
return this;
}
async create() {
for (const asset of this.assets) {
await this.context.createAsset(asset);
@ -124,6 +143,10 @@ export class TestFactory {
await this.context.createUser(user);
}
for (const partner of this.partners) {
await this.context.createPartner(partner);
}
for (const session of this.sessions) {
await this.context.createSession(session);
}
@ -138,6 +161,7 @@ export class TestContext {
albumRepository: AlbumRepository;
sessionRepository: SessionRepository;
syncRepository: SyncRepository;
partnerRepository: PartnerRepository;
private constructor(private db: Kysely<DB>) {
this.userRepository = new UserRepository(this.db);
@ -145,6 +169,7 @@ export class TestContext {
this.albumRepository = new AlbumRepository(this.db);
this.sessionRepository = new SessionRepository(this.db);
this.syncRepository = new SyncRepository(this.db);
this.partnerRepository = new PartnerRepository(this.db);
}
static from(db: Kysely<DB>) {
@ -159,6 +184,10 @@ export class TestContext {
return this.userRepository.create(TestFactory.user(user));
}
createPartner(partner: Partner) {
return this.partnerRepository.create(TestFactory.partner(partner));
}
createAsset(asset: Asset) {
return this.assetRepository.create(TestFactory.asset(asset));
}

View File

@ -17,6 +17,8 @@ const setup = async () => {
const testSync = async (auth: AuthDto, types: SyncRequestType[]) => {
const stream = TestFactory.stream();
// Wait for 1ms to ensure all updates are available
await new Promise((resolve) => setTimeout(resolve, 1));
await sut.stream(auth, stream, { types });
return stream.getResponse();
@ -186,4 +188,178 @@ describe(SyncService.name, () => {
);
});
});
describe.concurrent('partners', () => {
it('should detect and sync the first partner', async () => {
const { auth, context, sut, testSync } = await setup();
const user1 = auth.user;
const user2 = await context.createUser();
const partner = await context.createPartner({ sharedById: user2.id, sharedWithId: user1.id });
const initialSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]);
expect(initialSyncResponse).toHaveLength(1);
expect(initialSyncResponse).toEqual(
expect.arrayContaining([
{
ack: expect.any(String),
data: {
inTimeline: partner.inTimeline,
sharedById: partner.sharedById,
sharedWithId: partner.sharedWithId,
},
type: 'PartnerV1',
},
]),
);
const acks = [initialSyncResponse[0].ack];
await sut.setAcks(auth, { acks });
const ackSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]);
expect(ackSyncResponse).toHaveLength(0);
});
it('should detect and sync a deleted partner', async () => {
const { auth, context, sut, testSync } = await setup();
const user1 = auth.user;
const user2 = await context.createUser();
const partner = await context.createPartner({ sharedById: user2.id, sharedWithId: user1.id });
await context.partnerRepository.remove(partner);
const response = await testSync(auth, [SyncRequestType.PartnersV1]);
expect(response).toHaveLength(1);
expect(response).toEqual(
expect.arrayContaining([
{
ack: expect.any(String),
data: {
sharedById: partner.sharedById,
sharedWithId: partner.sharedWithId,
},
type: 'PartnerDeleteV1',
},
]),
);
const acks = response.map(({ ack }) => ack);
await sut.setAcks(auth, { acks });
const ackSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]);
expect(ackSyncResponse).toHaveLength(0);
});
it('should detect and sync a partner share both to and from another user', async () => {
const { auth, context, sut, testSync } = await setup();
const user1 = auth.user;
const user2 = await context.createUser();
const partner1 = await context.createPartner({ sharedById: user2.id, sharedWithId: user1.id });
const partner2 = await context.createPartner({ sharedById: user1.id, sharedWithId: user2.id });
const response = await testSync(auth, [SyncRequestType.PartnersV1]);
expect(response).toHaveLength(2);
expect(response).toEqual(
expect.arrayContaining([
{
ack: expect.any(String),
data: {
inTimeline: partner1.inTimeline,
sharedById: partner1.sharedById,
sharedWithId: partner1.sharedWithId,
},
type: 'PartnerV1',
},
{
ack: expect.any(String),
data: {
inTimeline: partner2.inTimeline,
sharedById: partner2.sharedById,
sharedWithId: partner2.sharedWithId,
},
type: 'PartnerV1',
},
]),
);
await sut.setAcks(auth, { acks: [response[1].ack] });
const ackSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]);
expect(ackSyncResponse).toHaveLength(0);
});
it('should sync a partner and then an update to that same partner', async () => {
const { auth, context, sut, testSync } = await setup();
const user1 = auth.user;
const user2 = await context.createUser();
const partner = await context.createPartner({ sharedById: user2.id, sharedWithId: user1.id });
const initialSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]);
expect(initialSyncResponse).toHaveLength(1);
expect(initialSyncResponse).toEqual(
expect.arrayContaining([
{
ack: expect.any(String),
data: {
inTimeline: partner.inTimeline,
sharedById: partner.sharedById,
sharedWithId: partner.sharedWithId,
},
type: 'PartnerV1',
},
]),
);
const acks = [initialSyncResponse[0].ack];
await sut.setAcks(auth, { acks });
const updated = await context.partnerRepository.update(
{ sharedById: partner.sharedById, sharedWithId: partner.sharedWithId },
{ inTimeline: true },
);
const updatedSyncResponse = await testSync(auth, [SyncRequestType.PartnersV1]);
expect(updatedSyncResponse).toHaveLength(1);
expect(updatedSyncResponse).toEqual(
expect.arrayContaining([
{
ack: expect.any(String),
data: {
inTimeline: updated.inTimeline,
sharedById: updated.sharedById,
sharedWithId: updated.sharedWithId,
},
type: 'PartnerV1',
},
]),
);
});
it('should not sync a partner for an unrelated user', async () => {
const { auth, context, testSync } = await setup();
const user2 = await context.createUser();
const user3 = await context.createUser();
await context.createPartner({ sharedById: user2.id, sharedWithId: user3.id });
const response = await testSync(auth, [SyncRequestType.PartnersV1]);
expect(response).toHaveLength(0);
});
});
});

View File

@ -9,5 +9,7 @@ export const newSyncRepositoryMock = (): Mocked<RepositoryInterface<SyncReposito
deleteCheckpoints: vitest.fn(),
getUserUpserts: vitest.fn(),
getUserDeletes: vitest.fn(),
getPartnerUpserts: vitest.fn(),
getPartnerDeletes: vitest.fn(),
};
};