Package apiclient :: Module channel
[hide private]
[frames] | no frames]

Source Code for Module apiclient.channel

  1  """Channel notifications support. 
  2   
  3  Classes and functions to support channel subscriptions and notifications 
  4  on those channels. 
  5   
  6  Notes: 
  7    - This code is based on experimental APIs and is subject to change. 
  8    - Notification does not do deduplication of notification ids, that's up to 
  9      the receiver. 
 10    - Storing the Channel between calls is up to the caller. 
 11   
 12   
 13  Example setting up a channel: 
 14   
 15    # Create a new channel that gets notifications via webhook. 
 16    channel = new_webhook_channel("https://example.com/my_web_hook") 
 17   
 18    # Store the channel, keyed by 'channel.id'. Store it before calling the 
 19    # watch method because notifications may start arriving before the watch 
 20    # method returns. 
 21    ... 
 22   
 23    resp = service.objects().watchAll( 
 24      bucket="some_bucket_id", body=channel.body()).execute() 
 25    channel.update(resp) 
 26   
 27    # Store the channel, keyed by 'channel.id'. Store it after being updated 
 28    # since the resource_id value will now be correct, and that's needed to 
 29    # stop a subscription. 
 30    ... 
 31   
 32   
 33  An example Webhook implementation using webapp2. Note that webapp2 puts 
 34  headers in a case insensitive dictionary, as headers aren't guaranteed to 
 35  always be upper case. 
 36   
 37    id = self.request.headers[X_GOOG_CHANNEL_ID] 
 38   
 39    # Retrieve the channel by id. 
 40    channel = ... 
 41   
 42    # Parse notification from the headers, including validating the id. 
 43    n = notification_from_headers(channel, self.request.headers) 
 44   
 45    # Do app specific stuff with the notification here. 
 46    if n.resource_state == 'sync': 
 47      # Code to handle sync state. 
 48    elif n.resource_state == 'exists': 
 49      # Code to handle the exists state. 
 50    elif n.resource_state == 'not_exists': 
 51      # Code to handle the not exists state. 
 52   
 53   
 54  Example of unsubscribing. 
 55   
 56    service.channels().stop(channel.body()) 
 57  """ 
 58   
 59  import datetime 
 60  import uuid 
 61   
 62  from apiclient import errors 
 63  from oauth2client import util 
 64   
 65   
 66  # The unix time epoch starts at midnight 1970. 
 67  EPOCH = datetime.datetime.utcfromtimestamp(0) 
 68   
 69  # Map the names of the parameters in the JSON channel description to 
 70  # the parameter names we use in the Channel class. 
 71  CHANNEL_PARAMS = { 
 72      'address': 'address', 
 73      'id': 'id', 
 74      'expiration': 'expiration', 
 75      'params': 'params', 
 76      'resourceId': 'resource_id', 
 77      'resourceUri': 'resource_uri', 
 78      'type': 'type', 
 79      'token': 'token', 
 80      } 
 81   
 82  X_GOOG_CHANNEL_ID     = 'X-GOOG-CHANNEL-ID' 
 83  X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER' 
 84  X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE' 
 85  X_GOOG_RESOURCE_URI   = 'X-GOOG-RESOURCE-URI' 
 86  X_GOOG_RESOURCE_ID    = 'X-GOOG-RESOURCE-ID' 
87 88 89 -def _upper_header_keys(headers):
90 new_headers = {} 91 for k, v in headers.iteritems(): 92 new_headers[k.upper()] = v 93 return new_headers
94
95 96 -class Notification(object):
97 """A Notification from a Channel. 98 99 Notifications are not usually constructed directly, but are returned 100 from functions like notification_from_headers(). 101 102 Attributes: 103 message_number: int, The unique id number of this notification. 104 state: str, The state of the resource being monitored. 105 uri: str, The address of the resource being monitored. 106 resource_id: str, The unique identifier of the version of the resource at 107 this event. 108 """ 109 @util.positional(5)
110 - def __init__(self, message_number, state, resource_uri, resource_id):
111 """Notification constructor. 112 113 Args: 114 message_number: int, The unique id number of this notification. 115 state: str, The state of the resource being monitored. Can be one 116 of "exists", "not_exists", or "sync". 117 resource_uri: str, The address of the resource being monitored. 118 resource_id: str, The identifier of the watched resource. 119 """ 120 self.message_number = message_number 121 self.state = state 122 self.resource_uri = resource_uri 123 self.resource_id = resource_id
124
125 126 -class Channel(object):
127 """A Channel for notifications. 128 129 Usually not constructed directly, instead it is returned from helper 130 functions like new_webhook_channel(). 131 132 Attributes: 133 type: str, The type of delivery mechanism used by this channel. For 134 example, 'web_hook'. 135 id: str, A UUID for the channel. 136 token: str, An arbitrary string associated with the channel that 137 is delivered to the target address with each event delivered 138 over this channel. 139 address: str, The address of the receiving entity where events are 140 delivered. Specific to the channel type. 141 expiration: int, The time, in milliseconds from the epoch, when this 142 channel will expire. 143 params: dict, A dictionary of string to string, with additional parameters 144 controlling delivery channel behavior. 145 resource_id: str, An opaque id that identifies the resource that is 146 being watched. Stable across different API versions. 147 resource_uri: str, The canonicalized ID of the watched resource. 148 """ 149 150 @util.positional(5)
151 - def __init__(self, type, id, token, address, expiration=None, 152 params=None, resource_id="", resource_uri=""):
153 """Create a new Channel. 154 155 In user code, this Channel constructor will not typically be called 156 manually since there are functions for creating channels for each specific 157 type with a more customized set of arguments to pass. 158 159 Args: 160 type: str, The type of delivery mechanism used by this channel. For 161 example, 'web_hook'. 162 id: str, A UUID for the channel. 163 token: str, An arbitrary string associated with the channel that 164 is delivered to the target address with each event delivered 165 over this channel. 166 address: str, The address of the receiving entity where events are 167 delivered. Specific to the channel type. 168 expiration: int, The time, in milliseconds from the epoch, when this 169 channel will expire. 170 params: dict, A dictionary of string to string, with additional parameters 171 controlling delivery channel behavior. 172 resource_id: str, An opaque id that identifies the resource that is 173 being watched. Stable across different API versions. 174 resource_uri: str, The canonicalized ID of the watched resource. 175 """ 176 self.type = type 177 self.id = id 178 self.token = token 179 self.address = address 180 self.expiration = expiration 181 self.params = params 182 self.resource_id = resource_id 183 self.resource_uri = resource_uri
184
185 - def body(self):
186 """Build a body from the Channel. 187 188 Constructs a dictionary that's appropriate for passing into watch() 189 methods as the value of body argument. 190 191 Returns: 192 A dictionary representation of the channel. 193 """ 194 result = { 195 'id': self.id, 196 'token': self.token, 197 'type': self.type, 198 'address': self.address 199 } 200 if self.params: 201 result['params'] = self.params 202 if self.resource_id: 203 result['resourceId'] = self.resource_id 204 if self.resource_uri: 205 result['resourceUri'] = self.resource_uri 206 if self.expiration: 207 result['expiration'] = self.expiration 208 209 return result
210
211 - def update(self, resp):
212 """Update a channel with information from the response of watch(). 213 214 When a request is sent to watch() a resource, the response returned 215 from the watch() request is a dictionary with updated channel information, 216 such as the resource_id, which is needed when stopping a subscription. 217 218 Args: 219 resp: dict, The response from a watch() method. 220 """ 221 for json_name, param_name in CHANNEL_PARAMS.iteritems(): 222 value = resp.get(json_name) 223 if value is not None: 224 setattr(self, param_name, value)
225
226 227 -def notification_from_headers(channel, headers):
228 """Parse a notification from the webhook request headers, validate 229 the notification, and return a Notification object. 230 231 Args: 232 channel: Channel, The channel that the notification is associated with. 233 headers: dict, A dictionary like object that contains the request headers 234 from the webhook HTTP request. 235 236 Returns: 237 A Notification object. 238 239 Raises: 240 errors.InvalidNotificationError if the notification is invalid. 241 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int. 242 """ 243 headers = _upper_header_keys(headers) 244 channel_id = headers[X_GOOG_CHANNEL_ID] 245 if channel.id != channel_id: 246 raise errors.InvalidNotificationError( 247 'Channel id mismatch: %s != %s' % (channel.id, channel_id)) 248 else: 249 message_number = int(headers[X_GOOG_MESSAGE_NUMBER]) 250 state = headers[X_GOOG_RESOURCE_STATE] 251 resource_uri = headers[X_GOOG_RESOURCE_URI] 252 resource_id = headers[X_GOOG_RESOURCE_ID] 253 return Notification(message_number, state, resource_uri, resource_id)
254
255 256 @util.positional(2) 257 -def new_webhook_channel(url, token=None, expiration=None, params=None):
258 """Create a new webhook Channel. 259 260 Args: 261 url: str, URL to post notifications to. 262 token: str, An arbitrary string associated with the channel that 263 is delivered to the target address with each notification delivered 264 over this channel. 265 expiration: datetime.datetime, A time in the future when the channel 266 should expire. Can also be None if the subscription should use the 267 default expiration. Note that different services may have different 268 limits on how long a subscription lasts. Check the response from the 269 watch() method to see the value the service has set for an expiration 270 time. 271 params: dict, Extra parameters to pass on channel creation. Currently 272 not used for webhook channels. 273 """ 274 expiration_ms = 0 275 if expiration: 276 delta = expiration - EPOCH 277 expiration_ms = delta.microseconds/1000 + ( 278 delta.seconds + delta.days*24*3600)*1000 279 if expiration_ms < 0: 280 expiration_ms = 0 281 282 return Channel('web_hook', str(uuid.uuid4()), 283 token, url, expiration=expiration_ms, 284 params=params)
285