fix: expose stream of newly collected constructs and use it to update new tokens cache on analytics update (#5169)

This commit is contained in:
ggurdin 2026-01-12 10:24:55 -05:00 committed by GitHub
parent f920061f1b
commit ebe22129bc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 34 additions and 0 deletions

View file

@ -191,6 +191,8 @@ class ChatController extends State<ChatPageWithRoom>
StreamSubscription? _levelSubscription;
StreamSubscription? _constructsSubscription;
StreamSubscription? _tokensSubscription;
StreamSubscription? _botAudioSubscription;
final timelineUpdateNotifier = _TimelineUpdateNotifier();
late final ActivityChatController activityController;
@ -477,6 +479,11 @@ class ChatController extends State<ChatPageWithRoom>
);
}
void _onTokenUpdate(Set<ConstructIdentifier> constructs) {
if (constructs.isEmpty) return;
TokensUtil.clearNewTokenCache();
}
Future<void> _botAudioListener(SyncUpdate update) async {
if (update.rooms?.join?[roomId]?.timeline?.events == null) return;
final timeline = update.rooms!.join![roomId]!.timeline!;
@ -527,6 +534,9 @@ class ChatController extends State<ChatPageWithRoom>
_constructsSubscription =
updater.unlockedConstructsStream.stream.listen(_onUnlockConstructs);
_tokensSubscription =
updater.newConstructsStream.stream.listen(_onTokenUpdate);
_botAudioSubscription = room.client.onSync.stream.listen(_botAudioListener);
activityController = ActivityChatController(
@ -790,6 +800,7 @@ class ChatController extends State<ChatPageWithRoom>
_levelSubscription?.cancel();
_botAudioSubscription?.cancel();
_constructsSubscription?.cancel();
_tokensSubscription?.cancel();
_router.routeInformationProvider.removeListener(_onRouteChanged);
choreographer.timesDismissedIT.removeListener(_onCloseIT);
scrollController.dispose();

View file

@ -380,6 +380,9 @@ class AnalyticsDataService {
await _ensureInitialized();
final blocked = blockedConstructs;
final newUnusedConstructs =
updateIds.where((id) => !hasUsedConstruct(id)).toSet();
_mergeTable.addConstructsByUses(update.addedConstructs, blocked);
await _analyticsClientGetter.database.updateLocalAnalytics(
update.addedConstructs,
@ -437,6 +440,10 @@ class AnalyticsDataService {
events.add(ConstructBlockedEvent(update.blockedConstruct!));
}
if (newUnusedConstructs.isNotEmpty) {
events.add(NewConstructsEvent(newUnusedConstructs));
}
return events;
}

View file

@ -43,6 +43,9 @@ class AnalyticsUpdateDispatcher {
final StreamController<LevelUpdate> levelUpdateStream =
StreamController<LevelUpdate>.broadcast();
final StreamController<Set<ConstructIdentifier>> newConstructsStream =
StreamController<Set<ConstructIdentifier>>.broadcast();
final StreamController<MapEntry<ConstructIdentifier, UserSetLemmaInfo>>
_lemmaInfoUpdateStream = StreamController<
MapEntry<ConstructIdentifier, UserSetLemmaInfo>>.broadcast();
@ -98,6 +101,9 @@ class AnalyticsUpdateDispatcher {
case final ConstructBlockedEvent e:
_onBlockedConstruct(e.blockedConstruct);
break;
case final NewConstructsEvent e:
_onNewConstruct(e.newConstructs);
break;
}
}
@ -137,4 +143,9 @@ class AnalyticsUpdateDispatcher {
);
constructUpdateStream.add(update);
}
void _onNewConstruct(Set<ConstructIdentifier> constructIds) {
if (constructIds.isEmpty) return;
newConstructsStream.add(constructIds);
}
}

View file

@ -23,3 +23,8 @@ class ConstructBlockedEvent extends AnalyticsUpdateEvent {
final ConstructIdentifier blockedConstruct;
ConstructBlockedEvent(this.blockedConstruct);
}
class NewConstructsEvent extends AnalyticsUpdateEvent {
final Set<ConstructIdentifier> newConstructs;
NewConstructsEvent(this.newConstructs);
}