import asyncio
import atexit
import collections
import ctypes
import functools
import inspect
import sys
import threading
import time
import traceback
from typing import (

from import cadef, dbr

from .types import AugmentedValue, Count, Datatype, Dbe, Format, Timeout

T = TypeVar("T")
PVs = Union[List[str], Tuple[str, ...]]


# patch into
cadef.ca_current_context = cadef.libca.ca_current_context
cadef.ca_current_context.restype = ctypes.c_void_p

cadef.ca_attach_context = cadef.libca.ca_attach_context
cadef.ca_attach_context.argtypes = [ctypes.c_void_p]
cadef.ca_attach_context.errcheck = cadef.expect_ECA_NORMAL

cadef.ca_detach_context = cadef.libca.ca_detach_context

class ValueEvent(Generic[T]):
    def __init__(self) -> None:
        self.value: Union[T, Exception] = RuntimeError("No value set")
        self._event = asyncio.Event()

    def set(self, value: Union[T, Exception]):
        self.value = value

    def is_set(self) -> bool:
        return self._event.is_set()

    def clear(self):
        self.value = RuntimeError("No value set")

    async def wait(self) -> T:
        if not self._event.is_set():
            await self._event.wait()
        if isinstance(self.value, Exception):
            raise self.value
            return self.value

[docs]class CANothing(Exception): """This value is returned as a success or failure indicator from `caput`, as a failure indicator from `caget`, and may be raised as an exception to report a data error on caget or caput with wait.""" def __init__(self, name: str, errorcode=cadef.ECA_NORMAL) -> None: #: Name of the PV str = name #: True for successful completion, False for error code self.ok: int = errorcode == cadef.ECA_NORMAL #: ECA error code self.errorcode: int = errorcode def __repr__(self): return "CANothing(%r, %d)" % (, self.errorcode) def __str__(self): return "%s: %s" % (, cadef.ca_message(self.errorcode)) def __bool__(self): return self.ok
def maybe_throw(async_function): """Function decorator for optionally catching exceptions. Exceptions raised by the wrapped function are normally propagated unchanged, but if throw=False is specified as a keyword argument then the exception is transformed into an ordinary CANothing value!""" @functools.singledispatch async def throw_wrapper( pv, *args, timeout: Timeout = DEFAULT_TIMEOUT, throw=True, **kwargs ): awaitable = ca_timeout(async_function(pv, *args, **kwargs), pv, timeout) if throw: return await awaitable else: # We catch all the expected exceptions, converting them into # CANothing() objects as appropriate. Any unexpected exceptions # will be raised anyway, which seems fair enough! try: return await awaitable except CANothing as error: return error except cadef.CAException as error: return CANothing(pv, error.status) except cadef.Disconnected: return CANothing(pv, cadef.ECA_DISCONN) # The singledispatch decorator makes a sync wrapper. We need it to be # async so it works with inspect.iscoroutine, so wrap it again @functools.wraps(async_function) async def call_wrapper(*args, **kwargs): return await throw_wrapper(*args, **kwargs) # But keep the register function and register a signature that includes # the extras we added call_wrapper.register = throw_wrapper.register original_sig = inspect.signature(async_function) throw_parameters = inspect.signature(throw_wrapper).parameters parameters = [ *original_sig.parameters.values(), throw_parameters["timeout"], throw_parameters["throw"], ] call_wrapper.__signature__ = original_sig.replace(parameters=parameters) return call_wrapper async def ca_timeout(awaitable: Awaitable[T], name: str, timeout: Timeout = None) -> T: """Wait for awaitable to complete, with timeout one of: None Wait forever interval Wait for this amount of seconds (deadline,) Wait until this absolute timestamp Convert any timeouts into a CANothing timeout containing the pv name """ if timeout is not None: # Convert (abstimeout,) to relative timeout for asyncio if isinstance(timeout, tuple): timeout = timeout[0] - time.time() try: result = await asyncio.wait_for(awaitable, timeout) except asyncio.TimeoutError as e: raise CANothing(name, cadef.ECA_TIMEOUT) from e else: result = await awaitable return result def parallel_timeout(kwargs: Dict[str, Any]) -> Dict[str, Any]: """Return kwargs with a suitable timeout for running in parallel""" if kwargs.get("throw", True): # told to throw, so remove the timeout as it will be done at the top level kwargs = dict(kwargs, timeout=None) return kwargs async def in_parallel( awaitables: Sequence[Awaitable[T]], kwargs: Dict[str, Any] ) -> List[T]: if kwargs.get("throw", True): # timeout at this level, awaitables will not timeout themselves timeout = kwargs.get("timeout", DEFAULT_TIMEOUT) results = await ca_timeout(asyncio.gather(*awaitables), "Multiple PVs", timeout) else: # timeout being done at the level of each awaitable results = await asyncio.gather(*awaitables) return list(results) # ---------------------------------------------------------------------------- # Channel object and cache class Channel(object): """Wraps a single channel access channel object.""" __slots__ = [ "name", "__subscriptions", # Set of listening subscriptions "__connect_event", # Connection event used to notify changes "__event_loop", "_as_parameter_", # Associated channel access channel handle ] @staticmethod @cadef.connection_handler def on_ca_connect(args) -> None: # pragma: no cover """This routine is called every time the connection status of the channel changes. This is called directly from channel access, which means that user callbacks should not be called directly.""" self: Channel = cadef.ca_puser(args.chid) op = args.op self.__event_loop.call_soon_threadsafe(self.on_ca_connect_, op) def on_ca_connect_(self, op): assert op in [cadef.CA_OP_CONN_UP, cadef.CA_OP_CONN_DOWN] connected = op == cadef.CA_OP_CONN_UP if connected: # Trigger wakeup of all listeners self.__connect_event.set(None) else: self.__connect_event.clear() # Inform all the connected subscriptions for subscription in self.__subscriptions: subscription._on_connect(connected) def __init__(self, name: str, loop: asyncio.AbstractEventLoop): """Creates a channel access channel with the given name.""" = name self.__subscriptions: Set[Subscription] = set() self.__connect_event = ValueEvent[None]() self.__event_loop = loop chid = ctypes.c_void_p() cadef.ca_create_channel( name, self.on_ca_connect, ctypes.py_object(self), 0, ctypes.byref(chid) ) # Setting this allows a channel object to autoconvert into the chid # when passed to ca_ functions. self._as_parameter_ = chid.value _flush_io() def _purge(self): """Forcible purge of channel. As well as closing the channels, ensures that all subscriptions attached to the channel are also closed.""" for subscription in list(self.__subscriptions): subscription.close() cadef.ca_clear_channel(self) del self._as_parameter_ _flush_io() def _add_subscription(self, subscription): """Adds the given subscription to the list of receivers of connection notification.""" self.__subscriptions.add(subscription) def _remove_subscription(self, subscription): """Removes the given subscription from the list of receivers.""" self.__subscriptions.remove(subscription) def count_subscriptions(self) -> int: """Return the number of currently active subscriptions for this Channel""" return len(self.__subscriptions) def connected(self) -> bool: """Return whether this Channel is currently connected to a PV""" return self.__connect_event.is_set() async def wait(self): """Waits for the channel to become connected if not already connected.""" await self.__connect_event.wait() class ChannelCache(object): """A cache of all open channels. If a channel is not present in the cache it is automatically opened. The cache needs to be purged to ensure a clean shutdown.""" def __init__(self, loop: asyncio.AbstractEventLoop): self.__channels: Dict[str, Channel] = {} self.__loop = loop self.__waiting = True self.__callbacks: Deque[Tuple[Callable, Tuple]] = collections.deque() def get_channel(self, name: str) -> Channel: try: # When the channel already exists, just return that return self.__channels[name] except KeyError: # Have to create a new channel channel = Channel(name, self.__loop) self.__channels[name] = channel return channel def get_channels(self) -> List[Channel]: return list(self.__channels.values()) def purge(self): """Purges all the channels in the cache: closes them right now. Will cause other channel access to fail, so only to be done on shutdown.""" for channel in self.__channels.values(): channel._purge() self.__channels.clear() def __call_callbacks(self): while self.__callbacks: callback, args = self.__callbacks.popleft() callback(*args) self.__waiting = True def call_soon_threadsafe(self, callback: Callable, *args): # pragma: no cover self.__callbacks.append((callback, args)) if self.__waiting: self.__waiting = False self.__loop.call_soon_threadsafe(self.__call_callbacks) # ---------------------------------------------------------------------------- # camonitor
[docs]class Subscription(object): """A Subscription object wraps a single channel access subscription, and notifies all updates through an event queue.""" __slots__ = [ "name", # Name of the PV subscribed to "callback", # The user callback function "dbr_to_value", # Conversion from dbr "channel", # The associated channel object "state", # Whether the subscription is active "dropped_callbacks", # How many values have been dropped without callback "_as_parameter_", # Associated channel access subscription handle "all_updates", # True iff all updates delivered without merging "notify_disconnect", # Whether to report disconnect events "pending_values", # Most recent updates from event handler "__lock", # Lock around pending_values and dropped_callbacks "__event_loop", # The event loop the caller created this from "__call_soon_threadsafe", # Reference to channel_cache.call_soon_threadsafe "__is_async", # If the callback is awaitable "__task", # Task for subscription updates "__future", # Future which will be updated if __is_async and pending_values ] # Subscription state values: OPENING = 0 # Subscription not complete yet OPEN = 1 # Normally active CLOSED = 2 # Closed but not yet deleted # Mapping from format to event mask for default events __default_events = { dbr.FORMAT_RAW: cadef.DBE_VALUE, dbr.FORMAT_TIME: cadef.DBE_VALUE | cadef.DBE_ALARM, dbr.FORMAT_CTRL: cadef.DBE_VALUE | cadef.DBE_ALARM | cadef.DBE_PROPERTY, } def __init__( self, name: str, callback: Callable[[Any], Union[None, Awaitable]], events: Dbe, datatype: Datatype, format: Format, count: Count, all_updates: bool, notify_disconnect: bool, connect_timeout: Timeout, ): = name self.callback = callback self.all_updates = all_updates self.notify_disconnect = notify_disconnect #: The number of updates that have been dropped as they happened #: while another callback was in progress self.dropped_callbacks: int = 0 self.__event_loop = asyncio.get_event_loop() self.pending_values: Deque[AugmentedValue] = collections.deque( maxlen=None if all_updates else 1 ) self.__future: Optional[asyncio.Future] = None self.__lock = threading.Lock() # Used for update merging. self.__is_async: Optional[bool] = None # If events not specified then compute appropriate default corresponding # to the requested format. if events is None: events = self.__default_events[format] # Trigger channel connection if channel not already known. = get_channel(name) self.__call_soon_threadsafe = _Context._channel_caches[ self.__event_loop ].call_soon_threadsafe # Spawn the actual task of creating the subscription into the # background, as we may have to wait for the channel to become # connected. self.state = self.OPENING self.__task = asyncio.ensure_future( self.__create_subscription(events, datatype, format, count, connect_timeout) ) @staticmethod @cadef.event_handler def __on_event(args) -> None: # pragma: no cover """This is called each time the subscribed value changes. As this is called asynchronously, a signal must be queued for later dispatching to the monitoring user.""" self: Subscription = args.usr try: assert ( args.status == cadef.ECA_NORMAL ), f"Subscription {} got bad status {args.status}" # Good data: extract value from the dbr. Note that this can fail value = self.dbr_to_value(args.raw_dbr, args.type, args.count) self.__queue_value(value) self.__call_soon_threadsafe(self.__signal) except Exception: # Something went wrong, report and close the connection self.__call_soon_threadsafe(self.close, sys.exc_info()) def __queue_value(self, value) -> None: # pragma: no cover """Threadsafe append to the queue""" with self.__lock: # Need to lock to make sure __signal code below doesn't # consume self.__values before we can increment dropped_callbacks if self.pending_values.maxlen and self.pending_values: # already full and we are going to overwrite it, so # increment the dropped values counter self.dropped_callbacks += 1 self.pending_values.append(value) def __signal(self) -> None: """Wrapper for performing callbacks safely: only performs the callback if the subscription is open and reports and handles any exceptions that might arise.""" if self.state != self.CLOSED: try: if self.__is_async is False: # The first update was not async, assume it will always not # be async and handle it here while self.pending_values: with self.__lock: value = self.pending_values.popleft() self.callback(value) else: # Signal to __create_subscription task to handle it if self.__future: self.__future.set_result(True) self.__future = None except Exception: # We try and be robust about exceptions in handlers, but to # prevent a perpetual storm of exceptions, we close the # subscription after reporting the problem. self.close(sys.exc_info()) def _on_connect(self, connected: bool): """This is called each time the connection state of the underlying channel changes. It is called synchronously.""" if not connected and self.notify_disconnect: # Channel has become disconnected: tell the subscriber. self.__queue_value(CANothing(, cadef.ECA_DISCONN)) self.__signal()
[docs] def close(self, exc_info=None) -> None: """Closes the subscription and releases any associated resources. Note that no further callbacks will occur on a closed subscription, not even callbacks currently queued for execution.""" if exc_info: print( "Subscription %s callback raised exception" %, file=sys.stderr, ) traceback.print_exception(*exc_info) print("Subscription %s closed" %, file=sys.stderr) if self.state == self.OPEN: cadef.ca_clear_subscription(self) if not self.__event_loop.is_closed(): _flush_io() self.__task.cancel() self.state = self.CLOSED
async def __do_initial_callback(self, value): ret = self.callback(value) self.__is_async = inspect.isawaitable(ret) if self.__is_async: await ret async def __wait_for_channel(self, timeout): try: # Wait for channel to be connected await ca_timeout(,, timeout) except CANothing as e: # Connection timeout. Let the caller know and now just block # until we connect (if ever). Note that in this case the caller # is notified even if notify_disconnect=False is set. await self.__do_initial_callback(e) await async def __create_subscription( self, events, datatype, format, count, connect_timeout ): """Creates the channel subscription with the specified parameters: event mask, datatype and format, array count. Waits for the channel to become connected.""" try: # Need to first wait for the channel to connect before we can do # anything else. This will either succeed, or wait forever, raising # if close() is called await self.__wait_for_channel(connect_timeout) self.state = self.OPEN # Treat a negative count as a request for the complete data if count < 0: count = cadef.ca_element_count( # Connect to the channel to be kept informed of connection updates. # Convert the datatype request into the subscription datatype. dbrcode, self.dbr_to_value = dbr.type_to_dbr(, datatype, format) # Finally create the subscription with all the requested properties # and hang onto the returned event id as our implicit ctypes # parameter. event_id = ctypes.c_void_p() cadef.ca_create_subscription( dbrcode, count,, events, self.__on_event, ctypes.py_object(self), ctypes.byref(event_id), ) _flush_io() self._as_parameter_ = event_id.value # If we are async then we should be servicing the callbacks here while self.__is_async is not False: try: with self.__lock: value = self.pending_values.popleft() except IndexError: self.__future = self.__event_loop.create_future() await self.__future else: if self.__is_async is None: # This is the first callback so need to calculate is_async await self.__do_initial_callback(value) else: await self.callback(value) except asyncio.CancelledError: # This is allowed raise except Exception: # Unexpected exception, close the subscription self.close(sys.exc_info()) raise
@overload def camonitor( pv: str, callback: Callable[[Any], Union[None, Awaitable]], events: Dbe = ..., datatype: Datatype = ..., format: Format = ..., count: Count = ..., all_updates: bool = ..., notify_disconnect: bool = ..., connect_timeout: Timeout = ..., ) -> Subscription: ... # pragma: no cover @overload def camonitor( pv: PVs, callback: Callable[[Any, int], Union[None, Awaitable]], events: Dbe = ..., datatype: Datatype = ..., format: Format = ..., count: Count = ..., all_updates: bool = ..., notify_disconnect: bool = ..., connect_timeout: Timeout = ..., ) -> List[Subscription]: ... # pragma: no cover
[docs]def camonitor( pv, callback, events=None, datatype=None, format=dbr.FORMAT_RAW, count=0, all_updates=False, notify_disconnect=False, connect_timeout=None, ): """Create a subscription to one or more PVs Args: callback: Regular function or async function events: Bit-wise or of `Dbe` types to notify about. If not given the default mask depends on the requested format datatype: Override `Datatype` to a non-native type format: Request extra `Format` fields count: Request a specific element `Count` in an array all_updates: If True then every update received from channel access will trigger a callback, otherwise any updates received during the previous callback will be merged into the most recent value, incrementing `Subscription.dropped_callbacks` notify_disconnect: If True then IOC disconnect events will be reported by calling the callback with a `CANothing` error with .ok False, otherwise only valid values will be passed to the callback routine connect_timeout: If specified then the camonitor will report a disconnection event after the specified interval if connection has not completed by this time. Note that this notification will be made even if notify_disconnect is False, and that if the PV subsequently connects it will update as normal. Returns: `Subscription` for single PV or [`Subscription`] for a list of PVs """ kwargs = locals().copy() if isinstance(kwargs.pop("pv"), str): return Subscription(pv, **kwargs) else: def make_cb(index, cb=kwargs.pop("callback")): return lambda v: cb(v, index) subs = [Subscription(x, make_cb(i), **kwargs) for i, x in enumerate(pv)] return subs
# ---------------------------------------------------------------------------- # caget @cadef.event_handler def _caget_event_handler(args): # pragma: no cover """This will be called when a caget request completes, either with a brand new data value or with failure. The result is communicated back to the original caller.""" # We are called exactly once, so can consume the context right now. Note # that we have to do some manual reference counting on the user context, # as this is a python object that is invisible to the C api. pv, dbr_to_value, done, event_loop = args.usr ctypes.pythonapi.Py_DecRef(args.usr) if args.status == cadef.ECA_NORMAL: try: value = dbr_to_value(args.raw_dbr, args.type, args.count) except Exception as e: value = e else: value = CANothing(pv, args.status) event_loop.call_soon_threadsafe(done.set, value) @overload async def caget( pv: str, datatype: Datatype = ..., format: Format = ..., count: Count = ..., timeout: Timeout = ..., throw: bool = ..., ) -> AugmentedValue: ... # pragma: no cover @overload async def caget( pvs: PVs, datatype: Datatype = ..., format: Format = ..., count: Count = ..., timeout: Timeout = ..., throw: bool = ..., ) -> List[AugmentedValue]: ... # pragma: no cover
[docs]@maybe_throw async def caget(pv: str, datatype=None, format=dbr.FORMAT_RAW, count=0): """Retrieves an `AugmentedValue` from one or more PVs. Args: datatype: Override `Datatype` to a non-native type format: Request extra `Format` fields count: Request a specific element `Count` in an array timeout: After how long should caget `Timeout` throw: If False then return `CANothing` instead of raising an exception Returns: `AugmentedValue` for single PV or [`AugmentedValue`] for a list of PVs """ # Note: docstring refers to both this function and caget_array below # Retrieve the requested channel and ensure it's connected. channel = get_channel(pv) await channel.wait() # A count of zero will be treated by EPICS in a version dependent manner, # either returning the entire waveform (equivalent to count=-1) or a data # dependent waveform length. if count < 0: # Treat negative count request as request for fixed underlying channel # size. count = cadef.ca_element_count(channel) elif count > 0: # Need to ensure we don't ask for more than the channel can provide as # otherwise may get API error. count = min(count, cadef.ca_element_count(channel)) # Assemble the callback context. Note that we need to explicitly # increment the reference count so that the context survives until the # callback routine gets to see it. dbrcode, dbr_to_value = dbr.type_to_dbr(channel, datatype, format) done = ValueEvent[AugmentedValue]() loop = asyncio.get_event_loop() context = (pv, dbr_to_value, done, loop) ctypes.pythonapi.Py_IncRef(context) # Perform the actual put as a non-blocking operation: we wait to be # informed of completion, or time out. cadef.ca_array_get_callback( dbrcode, count, channel, _caget_event_handler, ctypes.py_object(context) ) _flush_io() result = await done.wait() return result
@caget.register(list) # type: ignore @caget.register(tuple) # type: ignore async def caget_array(pvs: PVs, **kwargs): # Spawn a separate caget task for each pv: this allows them to complete # in parallel which can speed things up considerably. coros = [caget(pv, **parallel_timeout(kwargs)) for pv in pvs] results = await in_parallel(coros, kwargs) return results # ---------------------------------------------------------------------------- # caput @cadef.event_handler def _caput_event_handler(args): # pragma: no cover """Event handler for caput with callback completion. Returns status code to caller.""" # This is called exactly once when a caput request completes. Extract # our context information and discard the context immediately. pv, done, event_loop = args.usr ctypes.pythonapi.Py_DecRef(args.usr) if args.status == cadef.ECA_NORMAL: value = None else: value = CANothing(pv, args.status) event_loop.call_soon_threadsafe(done.set, value) @overload async def caput( pv: str, value, datatype: Datatype = ..., wait: bool = ..., timeout: Timeout = ..., throw: bool = ..., ) -> CANothing: ... # pragma: no cover @overload async def caput( pvs: PVs, values, repeat_value: bool = ..., datatype: Datatype = ..., wait: bool = ..., timeout: Timeout = ..., throw: bool = ..., ) -> List[CANothing]: ... # pragma: no cover
[docs]@maybe_throw async def caput(pv: str, value, datatype=None, wait=False): """Writes values to one or more PVs If a list of PVs is given, then normally value will have the same length and value[i] is written to pv[i]. If value is a scalar or repeat_value=True then the same value is written to all PVs. Args: repeat_value: If True and a list of PVs is given, write the same value to every PV. datatype: Override `Datatype` to a non-native type wait: Do a caput with callback, waiting for completion timeout: After how long should a caput with wait=True `Timeout` throw: If False then return `CANothing` instead of raising an exception Returns: `CANothing` for single PV or [`CANothing`] for a list of PVs """ # Connect to the channel and wait for connection to complete. channel = get_channel(pv) await channel.wait() # Note: the unused value returned below needs to be retained so that # dbr_array, a pointer to C memory, has the right lifetime: it has to # survive until ca_array_put[_callback] has been called. dbrtype, count, dbr_array, value = dbr.value_to_dbr(channel, datatype, value) if wait: # Assemble the callback context and give it an extra reference count # to keep it alive until the callback handler sees it. done = ValueEvent[None]() context = (pv, done, asyncio.get_event_loop()) ctypes.pythonapi.Py_IncRef(context) # caput with callback requested: need to wait for response from # server before returning. cadef.ca_array_put_callback( dbrtype, count, channel, dbr_array, _caput_event_handler, ctypes.py_object(context), ) _flush_io() await done.wait() else: # Asynchronous caput, just do it now. cadef.ca_array_put(dbrtype, count, channel, dbr_array) _flush_io() # Return a success code for compatibility with throw=False code. return CANothing(pv)
@caput.register(list) # type: ignore @caput.register(tuple) # type: ignore async def caput_array(pvs: PVs, values, repeat_value=False, **kwargs): # Bring the arrays of pvs and values into alignment. if repeat_value or isinstance(values, str): # If repeat_value is requested or the value is a string then we treat # it as a single value. values = [values] * len(pvs) else: try: values = list(values) except TypeError: # If the value can't be treated as a list then again we treat it # as a single value values = [values] * len(pvs) assert len(pvs) == len(values), "PV and value lists must match in length" coros = [ caput(pv, value, **parallel_timeout(kwargs)) for pv, value in zip(pvs, values) ] results = await in_parallel(coros, kwargs) return results # ---------------------------------------------------------------------------- # connect
[docs]class CAInfo: """Object representing the information returned from `cainfo`""" #: Converts `state` into a printable description of the connection state. state_strings = ["never connected", "previously connected", "connected", "closed"] #: Textual descriptions of the possible channel data types, can be #: used to convert `datatype` into a printable string datatype_strings = [ "string", "short", "float", "enum", "char", "long", "double", "no access", ] def __init__(self, pv: str, channel: Channel): #: True iff the channel was successfully connected self.ok: bool = True #: The name of the PV str = pv #: State of channel as an integer. Look up ``state_strings[state]`` #: for textual description. self.state: int = cadef.ca_state(channel) #: Host name and port of server providing this PV str = cadef.ca_host_name(channel) #: True iff read access to this PV bool = cadef.ca_read_access(channel) #: True iff write access to this PV self.write: bool = cadef.ca_write_access(channel) if self.state == cadef.cs_conn: #: Data count of this channel self.count: int = cadef.ca_element_count(channel) #: Underlying channel datatype as `Dbr` value. Look up #: ``datatype_strings[datatype]`` for textual description. self.datatype: int = cadef.ca_field_type(channel) else: self.count = 0 self.datatype = 7 # DBF_NO_ACCESS def __str__(self): return """%s: State: %s Host: %s Access: %s, %s Data type: %s Count: %d""" % (, self.state_strings[self.state],,, self.write, self.datatype_strings[self.datatype], self.count, )
@overload async def connect( pv: str, wait: bool = ..., timeout: Timeout = ..., throw: bool = ... ) -> CANothing: ... # pragma: no cover @overload async def connect( pv: PVs, wait: bool = ..., timeout: Timeout = ..., throw: bool = ... ) -> List[CANothing]: ... # pragma: no cover
[docs]@maybe_throw async def connect(pv: str, wait=True): """Establishes a connection to one or more PVs A single PV or a list of PVs can be given. This does not normally need to be called, as the ca...() routines will establish their own connections as required, but after a successful connection we can guarantee that caput(..., wait=False) will complete immediately without suspension. This routine can safely be called repeatedly without any extra side effects. Args: wait: If False then queue a connection without waiting for completion timeout: After how long should the connect with wait=True `Timeout` throw: If False then return `CANothing` instead of raising an exception Returns: `CANothing` for single PV or [`CANothing`] for a list of PVs """ channel = get_channel(pv) if wait: await channel.wait() return CANothing(pv)
@connect.register(list) # type: ignore @connect.register(tuple) # type: ignore async def connect_array(pvs: PVs, wait=True, **kwargs): coros = [connect(pv, **parallel_timeout(kwargs)) for pv in pvs] results = await in_parallel(coros, kwargs) return results @overload async def cainfo( pv: str, wait: bool = ..., timeout: Timeout = ..., throw: bool = ... ) -> CAInfo: ... # pragma: no cover @overload async def cainfo( pv: PVs, wait: bool = ..., timeout: Timeout = ..., throw: bool = ... ) -> List[CAInfo]: ... # pragma: no cover
[docs]@maybe_throw async def cainfo(pv: str, wait=True): """Returns a `CAInfo` structure for the given PVs. See the documentation for `connect()` for details of arguments. """ channel = get_channel(pv) if wait: await channel.wait() return CAInfo(pv, channel)
@cainfo.register(tuple) # type: ignore @cainfo.register(list) # type: ignore async def cainfo_array(pvs: PVs, wait=True, **kwargs): coros = [cainfo(pv, **parallel_timeout(kwargs)) for pv in pvs] results = await in_parallel(coros, kwargs) return results # ---------------------------------------------------------------------------- # CA Context manager class _Context: _ca_available = False _ca_context = None _channel_caches: Dict[asyncio.AbstractEventLoop, ChannelCache] = {} @classmethod def purge_channel_caches(cls): """Remove cached channel connections. This will close all subscriptions""" for channel_cache in cls._channel_caches.values(): channel_cache.purge() cls._channel_caches.clear() @classmethod def _ensure_context(cls): if not cls._ca_available: if not cadef.ca_current_context(): # There is no CA context available for us to use, make one cadef.ca_context_create(1) # EPICS Channel Access event dispatching needs to done with a little # care. In previous versions the solution was to repeatedly call # ca_pend_event() in polling mode, but this does not appear to be # efficient enough when receiving large amounts of data. Instead we # enable preemptive Channel Access callbacks, which means we need to # cope with all of our channel access events occuring # asynchronously. cls._ca_context = cadef.ca_current_context() cls._ca_available = True atexit.register(cls._destroy_context) @classmethod def _destroy_context(cls): # pragma: no cover # On exit we do our best to ensure that channel access shuts down cleanly. # We do this by shutting down all channels and clearing the channel access # context: this should reduce the risk of unexpected errors during # application exit. # One reason that it's rather important to do this properly is that we # can't safely do *any* ca_ calls once ca_context_destroy() is called! cls.purge_channel_caches() if cls._ca_context: # Make sure we are attached to that context cadef.ca_detach_context() cadef.ca_attach_context(cls._ca_context) # Then destroy it cadef.ca_context_destroy() cadef.ca_detach_context() cls._ca_context = None cls._ca_available = False @classmethod def get_channel_cache(cls): cls._ensure_context() loop = asyncio.get_event_loop() try: channel_cache = cls._channel_caches[loop] except KeyError: # Channel from new event loop. Don't support multiple event loops, so # clear out all the old channels cls.purge_channel_caches() channel_cache = ChannelCache(loop) cls._channel_caches[loop] = channel_cache return channel_cache purge_channel_caches = _Context.purge_channel_caches def get_channel(pv: str) -> Channel: channel_cache = _Context.get_channel_cache() channel = channel_cache.get_channel(pv) return channel
[docs]class ChannelInfo: """Information about a particular Channel Attributes: name: Process Variable name the Channel is targeting connected: True if the Channel is currently connected subscriber_count: Number of clients subscribed to this Channel""" name: str connected: bool subscriber_count: int def __init__(self, name, connected, subscriber_count): = name self.connected = connected self.subscriber_count = subscriber_count
[docs]def get_channel_infos() -> List[ChannelInfo]: """Return information about all Channels""" return [ ChannelInfo(, channel.connected(), channel.count_subscriptions()) for channel in _Context.get_channel_cache().get_channels() ]
# Another delicacy arising from relying on asynchronous CA event dispatching is # that we need to manually flush IO events such as caget commands. To ensure # that large blocks of channel access activity really are aggregated we used to # ensure that ca_flush_io() is only called once in any scheduling cycle, but now # we just call it every time _flush_io = cadef.ca_flush_io # ---------------------------------------------------------------------------- # Helper function for running async code.
[docs]def run(coro, forever=False): """Convenience function that makes an event loop and runs the async function within it. Args: forever: If True then run the event loop forever, otherwise return on completion of the coro """ loop = asyncio.get_event_loop() t = None try: if forever: t = asyncio.ensure_future(coro, loop=loop) loop.run_forever() else: return loop.run_until_complete(coro) finally: if t: t.cancel() loop.stop() loop.close()