352 lines
9.9 KiB
Python
352 lines
9.9 KiB
Python
import click
|
|
import requests
|
|
import json
|
|
import os
|
|
import output
|
|
|
|
import importlib.metadata
|
|
|
|
TICKET_STATUS = [
|
|
"reported",
|
|
"confirmed",
|
|
"in_progress",
|
|
"pending",
|
|
"resolved"
|
|
]
|
|
TICKET_RESOLUTION = [
|
|
"unresolved",
|
|
"fixed",
|
|
"implemented",
|
|
"wont_fix",
|
|
"by_design",
|
|
"invalid",
|
|
"duplicate",
|
|
"not_our_bug"
|
|
]
|
|
PASTE_VISIBILITY = [
|
|
"public",
|
|
"private",
|
|
"unlisted"
|
|
]
|
|
|
|
access_token = os.environ["SOURCEHUT_CLI_ACCESS_TOKEN"]
|
|
username = "~"+os.environ["USER"]
|
|
base_url = ""
|
|
headers = {'User-Agent': "~thrrgilag srht-cli"}
|
|
|
|
@click.group()
|
|
@click.option('--user')
|
|
@click.option('--token')
|
|
@click.version_option(version='0.2.1')
|
|
def cli(user, token):
|
|
global access_token
|
|
global username
|
|
global headers
|
|
if token is not None:
|
|
access_token = token
|
|
if user is not None:
|
|
username = "~"+user
|
|
headers['Authorization'] = "Bearer " + access_token
|
|
|
|
@cli.group()
|
|
def paste():
|
|
'''sourcehut paste hosting service'''
|
|
global base_url
|
|
base_url = "https://paste.sr.ht/api"
|
|
|
|
@paste.command("list")
|
|
def get_pastes():
|
|
'''List pastes'''
|
|
url = base_url + f"/pastes"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
output.show_pastes(r.json()['results'])
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@paste.command("show")
|
|
@click.argument('sha')
|
|
def show_paste(sha):
|
|
'''Show details of a paste'''
|
|
url = base_url + f"/pastes/{sha}"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
paste = r.json()
|
|
blobs = []
|
|
for b in paste['files']:
|
|
burl = base_url + f"/blobs/{b['blob_id']}"
|
|
br = requests.get(burl, headers=headers)
|
|
if br.status_code == 200:
|
|
item = {
|
|
'id': b['blob_id'],
|
|
'filename': b['filename'],
|
|
'content': br.json()['contents']
|
|
}
|
|
blobs.append(item)
|
|
paste['blobs'] = blobs
|
|
|
|
output.show_paste(paste)
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@paste.command("blob")
|
|
@click.argument('sha')
|
|
def show_blob(sha):
|
|
'''Show paste blob'''
|
|
url = base_url + f"/blobs/{sha}"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
#output.show_pastes(r.json()['results'])
|
|
#print(json.dumps(r.json(), indent=4))
|
|
print(r.json()['contents'])
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@paste.command("submit")
|
|
@click.argument('visibility', type=click.Choice(PASTE_VISIBILITY))
|
|
@click.option('--filename')
|
|
def submit_paste(visibility, filename):
|
|
'''Submit a new paste'''
|
|
content = click.edit()
|
|
if content is None or len(content) < 1:
|
|
print("nothing saved, aborting...")
|
|
return
|
|
|
|
url = base_url + f"/pastes"
|
|
payload = {'visibility': visibility}
|
|
blob = {'contents': content}
|
|
if filename is not None:
|
|
blob['filename'] = filename
|
|
payload['files'] = [blob]
|
|
|
|
r = requests.post(url, headers=headers, json=payload)
|
|
if r.status_code == 201:
|
|
info = r.json()
|
|
print(f"https://paste.sr.ht/{info['user']['canonical_name']}/{info['sha']}")
|
|
for blob in info['files']:
|
|
print(f"https://paste.sr.ht/blob/{blob['blob_id']}")
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@paste.command("delete")
|
|
@click.argument('sha')
|
|
def delete_paste(sha):
|
|
'''Delete a paste'''
|
|
url = base_url + f"/pastes/{sha}"
|
|
r = requests.delete(url, headers=headers)
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@cli.group()
|
|
def git():
|
|
'''sourcehut git hosting service'''
|
|
global base_url
|
|
base_url = "https://git.sr.ht/api"
|
|
|
|
@git.command("repos")
|
|
@click.option('--verbose', '-v', is_flag=True)
|
|
def get_repos(verbose):
|
|
'''List repositories'''
|
|
url = base_url + f"/{username}/repos"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
output.show_repos(r.json()['results'], verbose)
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@git.command("webhooks")
|
|
@click.argument('repo')
|
|
def get_git_webhooks(repo):
|
|
'''List webhooks for a repository'''
|
|
url = base_url + f"/{username}/repos/{repo}/webhooks"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
print(json.dumps(r.json(), indent=4))
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@git.command("add-webhook")
|
|
@click.argument('repo')
|
|
@click.argument('url')
|
|
@click.option('--event', required=True, multiple=True)
|
|
def add_git_webhook(repo, url, event):
|
|
'''Add webhook to a repository'''
|
|
payload = {'url': url, 'events': event}
|
|
url = base_url + f"/{username}/repos/{repo}/webhooks"
|
|
r = requests.post(url, headers=headers, json=payload)
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@git.command("del-webhook")
|
|
@click.argument('repo')
|
|
@click.argument('hookid')
|
|
def del_git_webhook(repo, hookid):
|
|
'''Delete webhook from a repository'''
|
|
url = base_url + f"/{username}/repos/{repo}/webhooks/{hookid}"
|
|
r = requests.delete(url, headers=headers)
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@cli.group()
|
|
def todo():
|
|
'''sourcehut ticket tracking service'''
|
|
global base_url
|
|
base_url = "https://todo.sr.ht/api"
|
|
|
|
@todo.command("tickets")
|
|
@click.argument('tracker')
|
|
@click.option('--closed', is_flag=True)
|
|
def show_tickets(tracker, closed):
|
|
'''List tickets on a tracker'''
|
|
url = base_url + f"/user/{username}/trackers/{tracker}/tickets"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
output.show_tickets(r.json()['results'], closed)
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@todo.command("submit")
|
|
@click.argument('tracker')
|
|
def submit_ticket(tracker):
|
|
'''Submit new ticket on a tracker'''
|
|
issue = click.edit()
|
|
if issue is None:
|
|
print("nothing saved, aborting...")
|
|
return
|
|
|
|
title, description = issue.split('\n', 1)
|
|
description = description.lstrip()
|
|
|
|
url = base_url + f"/user/{username}/trackers/{tracker}/tickets"
|
|
payload = {'title': title, 'description': description}
|
|
r = requests.post(url, headers=headers, json=payload)
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@todo.command()
|
|
@click.argument('tracker')
|
|
@click.argument('ticketid')
|
|
@click.option('--status', type=click.Choice(TICKET_STATUS))
|
|
@click.option('--resolution', type=click.Choice(TICKET_RESOLUTION))
|
|
@click.option('--label', multiple=True)
|
|
def comment(tracker, ticketid, status, resolution, label):
|
|
'''Comment, label, update ticket status'''
|
|
payload = {}
|
|
|
|
comment = click.edit()
|
|
if comment is not None and len(comment) > 0:
|
|
payload["comment"] = comment
|
|
|
|
if status is not None:
|
|
payload["status"] = status
|
|
|
|
if label is not None:
|
|
payload["labels"] = label
|
|
|
|
if resolution is not None:
|
|
# The legacy API is a bit odd and only works to set the status and
|
|
# resolution together to close or re-open. Status otherwise seems
|
|
# irrelevant.
|
|
if status != "reported":
|
|
payload["status"] = "resolved"
|
|
payload["resolution"] = resolution
|
|
|
|
if len(payload) < 1:
|
|
print("nothing saved, aborting...")
|
|
return
|
|
|
|
print(payload)
|
|
url = base_url + f"/user/{username}/trackers/{tracker}/tickets/{ticketid}"
|
|
r = requests.put(url, headers=headers, json=payload)
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@todo.command("show")
|
|
@click.argument('tracker')
|
|
@click.argument('ticketid')
|
|
def show_ticket(tracker, ticketid):
|
|
'''Show details of a ticket on a tracker'''
|
|
|
|
url = base_url + f"/user/{username}/trackers/{tracker}/tickets/{ticketid}"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
output.show_ticket(r.json())
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
return
|
|
|
|
# fetch and display the events for the ticket as well
|
|
url = base_url + f"/user/{username}/trackers/{tracker}/tickets/{ticketid}/events"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
output.show_comments(r.json()['results'])
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@todo.command("trackers")
|
|
@click.option('--verbose', '-v', is_flag=True)
|
|
def get_trackers(verbose):
|
|
'''List trackers'''
|
|
url = base_url + f"/user/{username}/trackers"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
output.show_trackers(r.json()['results'], verbose)
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@todo.command("labels")
|
|
@click.argument('tracker')
|
|
def get_labels(tracker):
|
|
'''List labels for a tracker'''
|
|
url = base_url + f"/user/{username}/trackers/{tracker}/labels"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
output.show_labels(r.json()['results'])
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@todo.command("webhooks")
|
|
@click.argument('tracker')
|
|
def get_tracker_webhooks(tracker):
|
|
'''List webhooks for a tracker'''
|
|
url = base_url + f"/user/{username}/trackers/{tracker}/webhooks"
|
|
r = requests.get(url, headers=headers)
|
|
if r.status_code == 200:
|
|
print(json.dumps(r.json(), indent=4))
|
|
else:
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@todo.command("del-webhook")
|
|
@click.argument('tracker')
|
|
@click.argument('hookid')
|
|
def del_tacker_webhook(tracker, hookid):
|
|
'''Delete webhook from a tracker'''
|
|
url = base_url + f"/user/{username}/trackers/{tracker}/webhooks/{hookid}"
|
|
r = requests.delete(url, headers=headers)
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|
|
@todo.command("add-webhook")
|
|
@click.argument('tracker')
|
|
@click.argument('url')
|
|
@click.option('--event', required=True, multiple=True)
|
|
def add_tacker_webhook(tracker, url, event):
|
|
'''Add webhook to a tracker'''
|
|
payload = {'url': url, 'events': event}
|
|
url = base_url + f"/user/{username}/trackers/{tracker}/webhooks"
|
|
r = requests.post(url, headers=headers, json=payload)
|
|
print(r.status_code)
|
|
print(r.text)
|
|
|