#!/usr/bin/python3 import asyncio import functools import dbus def may_reconnect(func): @functools.wraps(func) def wrapped(self, *args, **kwargs): for _ in range(2): try: return func(self, *args, **kwargs) except (dbus.exceptions.DBusException, AttributeError): self.connect() return wrapped class FcitxComm(): def __init__(self): # don't connect on start; connect on demand # self.connect() pass def connect(self): bus = dbus.SessionBus() obj = bus.get_object('org.fcitx.Fcitx5', '/controller') self.fcitx = dbus.Interface(obj, dbus_interface='org.fcitx.Fcitx.Controller1') @may_reconnect def status(self): return self.fcitx.State() == 2 @may_reconnect def activate(self): self.fcitx.Activate() @may_reconnect def deactivate(self): self.fcitx.Deactivate() async def fcitx_serve(Fcitx, reader, writer): while True: data = await reader.read(1) if not data: break comm = data[0] if comm == 0: st = 1 if Fcitx.status() else 0 writer.write(st.to_bytes(1, 'little')) await writer.drain() elif comm == 1: Fcitx.activate() elif comm == 2: Fcitx.deactivate() async def main(): Fcitx = FcitxComm() server = await asyncio.start_unix_server( functools.partial(fcitx_serve, Fcitx), path='\0fcitx-status') async with server: await server.serve_forever() if __name__ == '__main__': try: import setproctitle setproctitle.setproctitle('fcitx-status') except ImportError: pass asyncio.run(main())