Estuary API service client.
Usage:
estuary_endpoint = "https://api.estuary.tech"
estuary_key = "ESTbb693fa8-d758-48ce-9843-a8acadb98a53ARY" # fake key
estuary = EstuaryClient(estuary_endpoint, estuary_key)
pin = estuary.pin(stored_object)
removed_cid = estuary.unpin(pin.cid)
Source code in nucleus/sdk/storage/services.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111 | @dataclass
class Estuary:
"""Estuary API service client.
Usage:
estuary_endpoint = "https://api.estuary.tech"
estuary_key = "ESTbb693fa8-d758-48ce-9843-a8acadb98a53ARY" # fake key
estuary = EstuaryClient(estuary_endpoint, estuary_key)
pin = estuary.pin(stored_object)
removed_cid = estuary.unpin(pin.cid)
"""
endpoint: URL
key: str
def __post_init__(self):
self._http = http_client.live_session(self.endpoint)
self._http.headers.update({'Authorization': f'Bearer {self.key}'})
self._http.headers.update({'Content-Type': 'application/json'})
def _safe_request(self, res: Response) -> JSON:
"""Amplifier helper method to handle response from Estuary API
:param res: Expected response
:return: Json response
:raises StorageServiceError: If an error occurs during request
"""
# expected response as json
response = res.json()
# if response fail
if not res.ok:
"""
Observable behavior:
{
"code": 0,
"details": "string",
"reason": "string"
}
"""
error_description = response['error']['details']
raise StorageServiceError(f'exception raised during request: {error_description}')
return JSON(response)
def _content_by_cid(self, cid: CID) -> JSON:
"""Collect details from estuary based on CID
:param cid: Cid to retrieve content details
:return: Cid content details
:raises EdgePinException: If pin request fails
"""
content_uri = f'{ESTUARY_API_PUBLIC}/by-cid/{cid}'
req = self._http.get(content_uri)
# expected response as json
response = self._safe_request(req)
return response.get('content', {})
def pin(self, obj: Object) -> Pin:
"""Pin cid into estuary
:param obj: Object to pin
:return: Pin object
:raises StorageServiceError: If pin request fails
"""
# https://docs.estuary.tech/Reference/SwaggerUI#/pinning/post_pinning_pins
data = str(JSON({'cid': obj.hash, 'name': obj.name, 'meta': {}, 'origins': []}))
req = self._http.post(ESTUARY_API_PIN, data=data)
json_response = self._safe_request(req)
return Pin(
cid=json_response['cid'],
name=json_response['name'],
status='pending',
)
def unpin(self, cid: CID) -> CID:
"""Remove pin from estuary
:param cid: The cid to remove from service
:return: The recently removed CID
:raises StorageServiceError: if an error occurs during request
"""
# content id is same as pin id
pin_id = self._content_by_cid(cid).get('id')
response = self._http.delete(f'{ESTUARY_API_PIN}/{pin_id}')
# If error happens then raise standard exception.
self._safe_request(response)
return cid
|
pin(obj)
Pin cid into estuary
Parameters:
Name |
Type |
Description |
Default |
obj |
Object
|
Object to pin |
required
|
Returns:
Type |
Description |
Pin
|
Pin object |
Raises:
Type |
Description |
StorageServiceError
|
If pin request fails |
Source code in nucleus/sdk/storage/services.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 | def pin(self, obj: Object) -> Pin:
"""Pin cid into estuary
:param obj: Object to pin
:return: Pin object
:raises StorageServiceError: If pin request fails
"""
# https://docs.estuary.tech/Reference/SwaggerUI#/pinning/post_pinning_pins
data = str(JSON({'cid': obj.hash, 'name': obj.name, 'meta': {}, 'origins': []}))
req = self._http.post(ESTUARY_API_PIN, data=data)
json_response = self._safe_request(req)
return Pin(
cid=json_response['cid'],
name=json_response['name'],
status='pending',
)
|
unpin(cid)
Remove pin from estuary
Parameters:
Name |
Type |
Description |
Default |
cid |
CID
|
The cid to remove from service |
required
|
Returns:
Type |
Description |
CID
|
The recently removed CID |
Raises:
Type |
Description |
StorageServiceError
|
if an error occurs during request |
Source code in nucleus/sdk/storage/services.py
98
99
100
101
102
103
104
105
106
107
108
109
110
111 | def unpin(self, cid: CID) -> CID:
"""Remove pin from estuary
:param cid: The cid to remove from service
:return: The recently removed CID
:raises StorageServiceError: if an error occurs during request
"""
# content id is same as pin id
pin_id = self._content_by_cid(cid).get('id')
response = self._http.delete(f'{ESTUARY_API_PIN}/{pin_id}')
# If error happens then raise standard exception.
self._safe_request(response)
return cid
|