vimPlugins.fcitx.vim/fcitx-status
2021-04-13 13:42:17 +08:00

69 lines
1.5 KiB
Python
Executable file

#!/usr/bin/python3
import asyncio
import functools
import dbus
def may_reconnect(func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
for _ in range(2):
try:
return func(self, *args, **kwargs)
except (dbus.exceptions.DBusException, AttributeError):
self.connect()
return wrapped
class FcitxComm():
def __init__(self):
# don't connect on start; connect on demand
# self.connect()
pass
def connect(self):
bus = dbus.SessionBus()
obj = bus.get_object('org.fcitx.Fcitx5', '/controller')
self.fcitx = dbus.Interface(obj, dbus_interface='org.fcitx.Fcitx.Controller1')
@may_reconnect
def status(self):
return self.fcitx.State() == 2
@may_reconnect
def activate(self):
self.fcitx.Activate()
@may_reconnect
def deactivate(self):
self.fcitx.Deactivate()
async def fcitx_serve(Fcitx, reader, writer):
while True:
data = await reader.read(1)
if not data:
break
comm = data[0]
if comm == 0:
st = 1 if Fcitx.status() else 0
writer.write(st.to_bytes(1, 'little'))
await writer.drain()
elif comm == 1:
Fcitx.activate()
elif comm == 2:
Fcitx.deactivate()
async def main():
Fcitx = FcitxComm()
server = await asyncio.start_unix_server(
functools.partial(fcitx_serve, Fcitx), path='\0fcitx-status')
async with server:
await server.serve_forever()
if __name__ == '__main__':
try:
import setproctitle
setproctitle.setproctitle('fcitx-status')
except ImportError:
pass
asyncio.run(main())