// ignore_for_file: implementation_imports, depend_on_referenced_packages import 'dart:async'; import 'dart:math'; import 'package:matrix/matrix.dart'; import 'package:sqflite_common/sqflite.dart'; import 'package:synchronized/synchronized.dart'; import 'package:fluffychat/pangea/analytics_data/derived_analytics_data_model.dart'; import 'package:fluffychat/pangea/analytics_misc/construct_type_enum.dart'; import 'package:fluffychat/pangea/analytics_misc/construct_use_model.dart'; import 'package:fluffychat/pangea/analytics_misc/construct_use_type_enum.dart'; import 'package:fluffychat/pangea/analytics_misc/constructs_event.dart'; import 'package:fluffychat/pangea/analytics_misc/constructs_model.dart'; import 'package:fluffychat/pangea/constructs/construct_identifier.dart'; import 'package:matrix/src/database/database_file_storage_stub.dart' if (dart.library.io) 'package:matrix/src/database/database_file_storage_io.dart'; import 'package:matrix/src/database/indexeddb_box.dart' if (dart.library.io) 'package:matrix/src/database/sqflite_box.dart'; class AnalyticsDatabase with DatabaseFileStorage { final String name; late BoxCollection _collection; late Box _lastEventTimestampBox; late Box _serverConstructsBox; late Box _localConstructsBox; late Box _aggregatedServerVocabConstructsBox; late Box _aggregatedLocalVocabConstructsBox; late Box _aggregatedServerMorphConstructsBox; late Box _aggregatedLocalMorphConstructsBox; late Box _derivedStatsBox; static const String _serverConstructsBoxName = 'box_server_constructs'; static const String _localConstructsBoxName = 'box_local_constructs'; /// Key is Tuple of construct lemma, type, and category static const String _aggregatedServerVocabConstructsBoxName = 'box_aggregated_server_vocab_constructs'; static const String _aggregatedLocalVocabConstructsBoxName = 'box_aggregated_local_vocab_constructs'; static const String _aggregatedServerMorphConstructsBoxName = 'box_aggregated_server_morph_constructs'; static const String _aggregatedLocalMorphConstructsBoxName = 'box_aggregated_local_morph_constructs'; static const String _derivedStatsBoxName = 'box_derived_stats'; static const String _lastEventTimestampBoxName = 'box_last_event_timestamp'; Database? database; /// Custom IdbFactory used to create the indexedDB. On IO platforms it would /// lead to an error to import "dart:indexed_db" so this is dynamically /// typed. final dynamic idbFactory; /// Custom SQFlite Database Factory used for high level operations on IO /// like delete. Set it if you want to use sqlite FFI. final DatabaseFactory? sqfliteFactory; static Future init( String name, { Database? database, dynamic idbFactory, DatabaseFactory? sqfliteFactory, Uri? fileStorageLocation, Duration? deleteFilesAfterDuration, }) async { final analyticsDatabase = AnalyticsDatabase._( name, database: database, idbFactory: idbFactory, sqfliteFactory: sqfliteFactory, fileStorageLocation: fileStorageLocation, deleteFilesAfterDuration: deleteFilesAfterDuration, ); await analyticsDatabase.open(); return analyticsDatabase; } AnalyticsDatabase._( this.name, { this.database, this.idbFactory, this.sqfliteFactory, Uri? fileStorageLocation, Duration? deleteFilesAfterDuration, }) { this.fileStorageLocation = fileStorageLocation; this.deleteFilesAfterDuration = deleteFilesAfterDuration; } final _lock = Lock(); Future open() async { _collection = await BoxCollection.open( name, { _lastEventTimestampBoxName, _serverConstructsBoxName, _localConstructsBoxName, _aggregatedServerVocabConstructsBoxName, _aggregatedLocalVocabConstructsBoxName, _aggregatedServerMorphConstructsBoxName, _aggregatedLocalMorphConstructsBoxName, _derivedStatsBoxName, }, sqfliteDatabase: database, sqfliteFactory: sqfliteFactory, idbFactory: idbFactory, version: MatrixSdkDatabase.version, ); _lastEventTimestampBox = _collection.openBox( _lastEventTimestampBoxName, ); _serverConstructsBox = _collection.openBox( _serverConstructsBoxName, ); _localConstructsBox = _collection.openBox( _localConstructsBoxName, ); _aggregatedServerVocabConstructsBox = _collection.openBox( _aggregatedServerVocabConstructsBoxName, ); _aggregatedLocalVocabConstructsBox = _collection.openBox( _aggregatedLocalVocabConstructsBoxName, ); _aggregatedServerMorphConstructsBox = _collection.openBox( _aggregatedServerMorphConstructsBoxName, ); _aggregatedLocalMorphConstructsBox = _collection.openBox( _aggregatedLocalMorphConstructsBoxName, ); _derivedStatsBox = _collection.openBox( _derivedStatsBoxName, ); } Future delete() async { await _collection.deleteDatabase( database?.path ?? name, sqfliteFactory ?? idbFactory, ); } Future clear() async { _lastEventTimestampBox.clearQuickAccessCache(); _serverConstructsBox.clearQuickAccessCache(); _localConstructsBox.clearQuickAccessCache(); _aggregatedServerVocabConstructsBox.clearQuickAccessCache(); _aggregatedLocalVocabConstructsBox.clearQuickAccessCache(); _aggregatedServerMorphConstructsBox.clearQuickAccessCache(); _aggregatedLocalMorphConstructsBox.clearQuickAccessCache(); _derivedStatsBox.clearQuickAccessCache(); await _collection.clear(); } Future _transaction(Future Function() action) { return _lock.synchronized(action); } Box _aggBox(ConstructTypeEnum type, bool local) => switch ((type, local)) { (ConstructTypeEnum.vocab, true) => _aggregatedLocalVocabConstructsBox, (ConstructTypeEnum.vocab, false) => _aggregatedServerVocabConstructsBox, (ConstructTypeEnum.morph, true) => _aggregatedLocalMorphConstructsBox, (ConstructTypeEnum.morph, false) => _aggregatedServerMorphConstructsBox, }; Future getUserID() => _lastEventTimestampBox.get('user_id'); Future getLastEventTimestamp() async { final timestampString = await _lastEventTimestampBox.get('last_event_timestamp'); if (timestampString == null) return null; return DateTime.parse(timestampString); } Future getDerivedStats() async { final raw = await _derivedStatsBox.get('derived_stats'); return raw == null ? DerivedAnalyticsDataModel() : DerivedAnalyticsDataModel.fromJson( Map.from(raw), ); } Future> getUses({ int? count, String? roomId, DateTime? since, ConstructUseTypeEnum? type, }) async { final stopwatch = Stopwatch()..start(); final results = []; bool addUseIfValid(OneConstructUse use) { if (since != null && use.timeStamp.isBefore(since)) { return false; // stop iteration entirely } if (roomId != null && use.metadata.roomId != roomId) { return true; // skip but continue } if (type != null && use.useType != type) { return true; // skip but continue } results.add(use); return count == null || results.length < count; } // ---- Local uses ---- final localUses = await getLocalUses() ..sort((a, b) => b.timeStamp.compareTo(a.timeStamp)); for (final use in localUses) { if (!addUseIfValid(use)) break; } if (count != null && results.length >= count) { stopwatch.stop(); Logs().i("Get uses took ${stopwatch.elapsedMilliseconds} ms"); return results; } // ---- Server uses ---- final serverKeys = await _serverConstructsBox.getAllKeys() ..sort( (a, b) => int.parse(b.split('|')[1]).compareTo(int.parse(a.split('|')[1])), ); for (final key in serverKeys) { final serverUses = await getServerUses(key) ..sort((a, b) => b.timeStamp.compareTo(a.timeStamp)); for (final use in serverUses) { if (!addUseIfValid(use)) break; } if (count != null && results.length >= count) break; } stopwatch.stop(); Logs().i("Get uses took ${stopwatch.elapsedMilliseconds} ms"); return results; } Future> getLocalUses() async { final List uses = []; final localKeys = await _localConstructsBox.getAllKeys(); final localValues = await _localConstructsBox.getAll(localKeys); for (final rawList in localValues) { if (rawList == null) continue; for (final raw in rawList) { final use = OneConstructUse.fromJson( Map.from(raw), ); uses.add(use); } } return uses; } Future> getServerUses(String key) async { final List uses = []; final serverValues = await _serverConstructsBox.get(key); if (serverValues == null) return []; for (final entry in serverValues) { uses.add( OneConstructUse.fromJson( Map.from(entry), ), ); } return uses; } Future getLocalConstructCount() async { final keys = await _localConstructsBox.getAllKeys(); return keys.length; } Future> getVocabConstructKeys() async { final serverKeys = await _aggregatedServerVocabConstructsBox.getAllKeys(); final localKeys = await _aggregatedLocalVocabConstructsBox.getAllKeys(); return [...serverKeys, ...localKeys]; } Future> getMorphConstructKeys() async { final serverKeys = await _aggregatedServerMorphConstructsBox.getAllKeys(); final localKeys = await _aggregatedLocalMorphConstructsBox.getAllKeys(); return [...serverKeys, ...localKeys]; } Future getConstructUse( List ids, ) async { assert(ids.isNotEmpty); final ConstructUses construct = ConstructUses( uses: [], constructType: ids.first.type, lemma: ids.first.lemma, category: ids.first.category, ); for (final id in ids) { final key = id.storageKey; ConstructUses? server; ConstructUses? local; final serverBox = _aggBox(id.type, false); final localBox = _aggBox(id.type, true); final serverRaw = await serverBox.get(key); if (serverRaw != null) { server = ConstructUses.fromJson( Map.from(serverRaw), ); } final localRaw = await localBox.get(key); if (localRaw != null) { local = ConstructUses.fromJson( Map.from(localRaw), ); } if (server != null) construct.merge(server); if (local != null) construct.merge(local); } return construct; } Future> getConstructUses( Map> ids, ) async { final Map results = {}; for (final entry in ids.entries) { final construct = await getConstructUse(entry.value); results[entry.key] = construct; } return results; } Future clearLocalConstructData() async { await _transaction(() async { await _localConstructsBox.clear(); await _aggregatedLocalVocabConstructsBox.clear(); await _aggregatedLocalMorphConstructsBox.clear(); }); } /// Group uses by aggregate key Map> _groupUses( List uses, ) { final Map> grouped = {}; for (final u in uses) { final key = u.identifier.storageKey; (grouped[key] ??= []).add(u); } return grouped; } Map _aggregateConstructs( Map> groups, Map?> existingRaw, ) { final Map updates = {}; for (final entry in groups.entries) { final key = entry.key; final usesForKey = entry.value; final raw = existingRaw[key]; ConstructUses model; if (raw != null) { model = ConstructUses.fromJson(Map.from(raw)); } else { final u = usesForKey.first; model = ConstructUses( uses: [], constructType: u.constructType, lemma: u.lemma, category: u.category, ); } model.addUses(usesForKey); updates[key] = model; } return updates; } Future> _aggregateFromBox( Box box, Map> grouped, ) async { final keys = grouped.keys.toList(); final existing = await box.getAll(keys); final existingMap = Map.fromIterables(keys, existing); return _aggregateConstructs(grouped, existingMap); } Future> getAggregatedConstructs( ConstructTypeEnum type, ) async { Map combined = {}; final stopwatch = Stopwatch()..start(); final localKeys = await _aggBox(type, true).getAllKeys(); final serverKeys = await _aggBox(type, false).getAllKeys(); final serverValues = await _aggBox(type, false).getAll(serverKeys); final serverConstructs = serverValues .map((e) => ConstructUses.fromJson(Map.from(e!))) .toList(); final serverAgg = Map.fromIterables( serverKeys, serverConstructs, ); if (localKeys.isEmpty) { combined = serverAgg; } else { final localValues = await _aggBox(type, true).getAll(localKeys); final localConstructs = localValues .map((e) => ConstructUses.fromJson(Map.from(e!))) .toList(); final localAgg = Map.fromIterables( localKeys, localConstructs, ); combined = Map.from(serverAgg); for (final entry in localAgg.entries) { final key = entry.key; final localModel = entry.value; if (combined.containsKey(key)) { final serverModel = combined[key]!; serverModel.merge(localModel); combined[key] = serverModel; } else { combined[key] = localModel; } } } stopwatch.stop(); Logs().i( "Combining aggregates took ${stopwatch.elapsedMilliseconds} ms", ); return combined.values.toList(); } Future updateUserID(String userID) { return _transaction(() async { await _lastEventTimestampBox.put( 'user_id', userID, ); }); } Future updateXPOffset(int offset) { return _transaction(() async { final stats = await getDerivedStats(); final updatedStats = stats.copyWith(offset: offset); await _derivedStatsBox.put( 'derived_stats', updatedStats.toJson(), ); }); } Future updateDerivedStats(DerivedAnalyticsDataModel newStats) => _derivedStatsBox.put( 'derived_stats', newStats.toJson(), ); Future updateServerAnalytics( List events, ) async { if (events.isEmpty) return; final stopwatch = Stopwatch()..start(); await _transaction(() async { final lastUpdated = await getLastEventTimestamp(); DateTime mostRecent = lastUpdated ?? events.first.event.originServerTs; final existingKeys = (await _serverConstructsBox.getAllKeys()).toSet(); final List aggregatedVocabUses = []; final List aggregatedMorphUses = []; final Map> pendingWrites = {}; for (final event in events) { final ts = event.event.originServerTs; final key = TupleKey( event.event.eventId, ts.millisecondsSinceEpoch.toString(), ).toString(); if (lastUpdated != null && ts.isBefore(lastUpdated)) continue; if (existingKeys.contains(key)) continue; if (ts.isAfter(mostRecent)) mostRecent = ts; pendingWrites[key] = event.content.uses; for (final u in event.content.uses) { u.constructType == ConstructTypeEnum.vocab ? aggregatedVocabUses.add(u) : aggregatedMorphUses.add(u); } } if (pendingWrites.isEmpty) return; // Write events sequentially for (final e in pendingWrites.entries) { _serverConstructsBox.put( e.key, e.value.map((u) => u.toJson()).toList(), ); } // Update aggregates final aggVocabUpdates = await _aggregateFromBox( _aggregatedServerVocabConstructsBox, _groupUses(aggregatedVocabUses), ); for (final entry in aggVocabUpdates.entries) { await _aggregatedServerVocabConstructsBox.put( entry.key, entry.value.toJson(), ); } final aggMorphUpdates = await _aggregateFromBox( _aggregatedServerMorphConstructsBox, _groupUses(aggregatedMorphUses), ); for (final entry in aggMorphUpdates.entries) { await _aggregatedServerMorphConstructsBox.put( entry.key, entry.value.toJson(), ); } // Update timestamp await _lastEventTimestampBox.put( 'last_event_timestamp', mostRecent.toIso8601String(), ); }); stopwatch.stop(); Logs().i( "Server analytics update took ${stopwatch.elapsedMilliseconds} ms", ); } Future updateLocalAnalytics( List uses, ) async { if (uses.isEmpty) return; final stopwatch = Stopwatch()..start(); await _transaction(() async { // Store local constructs final key = DateTime.now().millisecondsSinceEpoch; _localConstructsBox.put( key.toString(), uses.map((u) => u.toJson()).toList(), ); final List vocabUses = []; final List morphUses = []; for (final u in uses) { u.constructType == ConstructTypeEnum.vocab ? vocabUses.add(u) : morphUses.add(u); } // Update aggregates final aggVocabUpdates = await _aggregateFromBox( _aggregatedLocalVocabConstructsBox, _groupUses(vocabUses), ); for (final entry in aggVocabUpdates.entries) { await _aggregatedLocalVocabConstructsBox.put( entry.key, entry.value.toJson(), ); } final aggMorphUpdates = await _aggregateFromBox( _aggregatedLocalMorphConstructsBox, _groupUses(morphUses), ); for (final entry in aggMorphUpdates.entries) { await _aggregatedLocalMorphConstructsBox.put( entry.key, entry.value.toJson(), ); } }); stopwatch.stop(); Logs().i("Local analytics update took ${stopwatch.elapsedMilliseconds} ms"); } }