"""
Copyright (c) 2017-2018, CodeLV.
Distributed under the terms of the MIT License.
The full license is in the file LICENSE, distributed with this software.
Created on Feb 11, 2018
"""
import os
from asyncio import Future
from atom.api import Dict, List, Typed, Event, Value
from .android_content import (
BroadcastReceiver,
Context,
SystemService,
PendingIntent,
Intent,
)
from .android_utils import HashMap
from .bridge import JavaBridgeObject, JavaMethod
def find_library(lib: str) -> str:
"""Locate a library with of the given name"""
lib_dir = os.environ["PY_LIB_DIR"]
path = f"{lib_dir}/{lib}.so"
if os.path.exists(path):
return path
return ""
def libusb_init(self, lib):
"""Patch pyusb's libusb1 backend to work on android"""
from ctypes import POINTER, c_void_p, c_int, byref
from usb.backend import IBackend, libusb1
IBackend.__init__(self)
self.lib = lib
self.ctx = c_void_p()
NO_DEVICE_DISCOVERY = 2
try:
_check = libusb1._check
lib.libusb_wrap_sys_device.argtypes = [c_void_p, c_void_p, POINTER(c_void_p)]
lib.libusb_wrap_sys_device.restype = c_int
lib.libusb_get_device.argtypes = [c_void_p]
lib.libusb_get_device.restype = c_void_p
_check(self.lib.libusb_set_option(None, NO_DEVICE_DISCOVERY, None))
_check(self.lib.libusb_init(byref(self.ctx)))
assert self.ctx
except Exception as e:
print(e)
raise
def load_libusb():
"""Apply a patch to pyusb and load the libusb1 backend"""
# Patch ctypes find_library to look in the correct location
# import ctypes.util
# ctypes.util.find_library = find_library
# Patch the LibUSB init to disable device discovery
from usb.backend import libusb1
libusb1._LibUSB.__init__ = libusb_init
return libusb1.get_backend(find_library)
class UsbEndpoint(JavaBridgeObject):
__nativeclass__ = "android.hardware.usb.UsbEndpoint"
getAddress = JavaMethod(returns=int)
getAttributes = JavaMethod(returns=int)
getDirection = JavaMethod(returns=int)
getEndpointNumber = JavaMethod(returns=int)
getInterval = JavaMethod(returns=int)
getMaxPacketSize = JavaMethod(returns=int)
class UsbInterface(JavaBridgeObject):
__nativeclass__ = "android.hardware.usb.UsbInterface"
getAlternateSetting = JavaMethod(returns=int)
getEndpoint = JavaMethod(int, returns=UsbEndpoint)
getEndpointCount = JavaMethod(returns=int)
getId = JavaMethod(returns=int)
getInterfaceClass = JavaMethod(returns=int)
getInterfaceProtocol = JavaMethod(returns=int)
getInterfaceSubclass = JavaMethod(returns=int)
getName = JavaMethod(returns=str)
class UsbConfiguration(JavaBridgeObject):
__nativeclass__ = "android.hardware.usb.UsbConfiguration"
class UsbRequest(JavaBridgeObject):
__nativeclass__ = "android.hardware.usb.UsbRequest"
cancel = JavaMethod()
close = JavaMethod()
getClientData = JavaMethod(returns=object)
getEndpoint = JavaMethod(returns=UsbEndpoint)
initialize = JavaMethod(lambda: UsbDeviceConnection, UsbEndpoint)
queue = JavaMethod("java.nio.ByteBuffer")
setClientData = JavaMethod(object)
[docs]class UsbDevice(JavaBridgeObject):
__nativeclass__ = "android.hardware.usb.UsbDevice"
info = Dict()
getDeviceName = JavaMethod(returns=str)
getDeviceId = JavaMethod(returns=int)
getInterface = JavaMethod(int, returns=UsbInterface)
getConfiguration = JavaMethod(int, returns=UsbConfiguration)
[docs] def __del__(self):
if UsbManager._instance and self.__id__ in UsbManager._instance.devices:
return # Do not destroy
super().__del__()
class UsbDeviceConnection(JavaBridgeObject):
__nativeclass__ = "android.hardware.usb.UsbDeviceConnection"
close = JavaMethod()
getFileDescriptor = JavaMethod(returns=int)
getSerial = JavaMethod(returns=str)
claimInterface = JavaMethod(UsbInterface, bool, returns=bool)
releaseInterface = JavaMethod(UsbInterface, returns=bool)
setInterface = JavaMethod(UsbInterface, returns=bool)
setConfiguration = JavaMethod(UsbConfiguration, returns=bool)
bulkTransfer = JavaMethod(UsbEndpoint, "[B", int, int)
bulkTransfer_ = JavaMethod(UsbEndpoint, "[B", int, int, int)
getRawDescriptors = JavaMethod(returns="[B")
requestWait = JavaMethod("long", returns=UsbRequest)
# Internal ctypes handles so this can be used with pyusb
devid = Value()
handle = Value()
async def acquire(self):
"""Acquire a pyusb device for this connection."""
from ctypes import c_void_p, byref
from usb.core import Device
from usb.backend import libusb1
fd = await self.getFileDescriptor()
backend = load_libusb()
self.handle = c_void_p()
libusb1._check(
backend.lib.libusb_wrap_sys_device(backend.ctx, fd, byref(self.handle))
)
self.devid = backend.lib.libusb_get_device(self.handle)
dev = Device(self, backend)
dev._ctx.handle = self # Already opened
return dev
class UsbAccessory(JavaBridgeObject):
__nativeclass__ = "android.hardware.usb.UsbAccessory"
[docs]class UsbManager(SystemService):
"""Use UsbManger.get().then(on_ready) to get an instance."""
__nativeclass__ = "android.hardware.usb.UsbManager"
EXTRA_PERMISSION_GRANTED = "permission"
EXTRA_DEVICE = "device"
ACTION_USB_PERMISSION = "com.codelv.enamlnative.USB_PERMISSION"
ACTION_USB_DEVICE_ATTACHED = "android.hardware.usb.action.USB_DEVICE_ATTACHED"
ACTION_USB_DEVICE_DETACHED = "android.hardware.usb.action.USB_DEVICE_DETACHED"
ACTION_USB_ACCESSORY_ATTACHED = (
"android.hardware.usb.action.ACTION_USB_ACCESSORY_ATTACHED"
)
ACTION_USB_ACCESSORY_DETACHED = (
"android.hardware.usb.action.ACTION_USB_ACCESSORY_DETACHED"
)
SERVICE_TYPE = Context.USB_SERVICE
getAccessoryList = JavaMethod(returns="android.hardware.usb.UsbAccessory[")
getDeviceList = JavaMethod(returns=HashMap)
openAccessory = JavaMethod(UsbAccessory, returns="android.os.ParcelFileDescriptor")
openDevice = JavaMethod(UsbDevice, returns=UsbDeviceConnection)
hasPermission = JavaMethod(UsbDevice, returns=bool)
requestPermission = JavaMethod(UsbDevice, PendingIntent)
#: These names are changed to support both signatures
hasPermission_ = JavaMethod(UsbAccessory, returns=bool)
requestPermission_ = JavaMethod(UsbAccessory, PendingIntent)
receiver = Typed(BroadcastReceiver)
permission_intent = Typed(PendingIntent)
pending_requests = List(Future)
#: Hold references to devices
devices = Dict(int, UsbDevice)
#: You can listen for this
device_attached = Event(UsbDevice)
device_detached = Event(UsbDevice)
def _default_receiver(self):
return BroadcastReceiver.for_action(
[
UsbManager.ACTION_USB_PERMISSION,
UsbManager.ACTION_USB_DEVICE_ATTACHED,
UsbManager.ACTION_USB_DEVICE_DETACHED,
],
self.on_device_action_received,
single_shot=False,
)
def _default_permission_intent(self):
app = self.__app__
assert app is not None
d = app.activity
assert d is not None
proxy = d.proxy
assert proxy is not None
activity = proxy.widget
assert activity is not None
intent = Intent(UsbManager.ACTION_USB_PERMISSION)
intent_id = PendingIntent.getBroadcast(activity, 0, intent, 0)
return PendingIntent(__id__=intent_id)
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Overridden to make sure it's listening
assert self.receiver is not None
assert self.permission_intent is not None
[docs] async def on_device_action_received(self, ctx, intent: Intent):
"""Invoked when the receiver fires"""
app = self.__app__
assert app is not None
# The bridge sends the intent as a dict
action = intent.action
if action == UsbManager.ACTION_USB_PERMISSION:
await self.on_device_permssion_result(ctx, intent)
elif action == UsbManager.ACTION_USB_DEVICE_ATTACHED:
await self.on_device_attached(ctx, intent)
elif action == UsbManager.ACTION_USB_DEVICE_DETACHED:
await self.on_device_detached(ctx, intent)
async def on_device_permssion_result(self, ctx, intent: Intent):
info = await intent.getParcelableExtra(UsbManager.EXTRA_DEVICE)
allowed = await intent.getBooleanExtra(
UsbManager.EXTRA_PERMISSION_GRANTED, False
)
try:
if allowed and info:
dev_id = info["id"]
dev = self.devices.get(dev_id, None)
if dev is None:
dev = UsbDevice(__id__=dev_id, info=info)
self.devices[dev_id] = dev
for f in self.pending_requests:
f.set_result(dev)
else:
err = PermissionError("Access to USB device denied")
for f in self.pending_requests:
f.set_exception(err)
finally:
self.clean_pending()
async def on_device_attached(self, ctx, intent: Intent):
info = await intent.getParcelableExtra(UsbManager.EXTRA_DEVICE)
if info:
dev_id = info["id"]
dev = self.devices.get(dev_id, None)
if dev is None:
dev = UsbDevice(__id__=dev_id, info=info)
self.devices[dev_id] = dev
self.device_attached(dev) # type: ignore
async def on_device_detached(self, ctx, intent: Intent):
info = await intent.getParcelableExtra(UsbManager.EXTRA_DEVICE)
if info:
dev_id = info["id"]
dev = self.devices.pop(dev_id, None)
if dev is None:
dev = UsbDevice(__id__=dev_id, info=info)
self.device_detached(dev) # type: ignore
def clean_pending(self):
pending = self.pending_requests
for f in pending[:]:
if f.done():
try:
pending.remove(f)
except ValueError:
pass
async def request_permission(self, device: UsbDevice) -> bool:
permission_intent = self.permission_intent
assert permission_intent is not None
app = self.__app__
assert app is not None
f = app.create_future()
self.pending_requests.append(f)
self.requestPermission(device, permission_intent)
return await f
[docs] async def get_devices(self) -> list[UsbDevice]:
"""Load devices and update the internal list. Since the bridge
caches a reference here it important to not delete it until it
detaches.
"""
devices = {}
result = await self.getDeviceList()
for info in result.values():
dev_id = info["id"]
dev = self.devices.get(dev_id, None)
if dev is None:
dev = UsbDevice(__id__=dev_id, info=info)
devices[dev_id] = dev
self.devices = devices
return list(self.devices.values())