import click import requests import json import os import output import importlib.metadata TICKET_STATUS = [ "reported", "confirmed", "in_progress", "pending", "resolved" ] TICKET_RESOLUTION = [ "unresolved", "fixed", "implemented", "wont_fix", "by_design", "invalid", "duplicate", "not_our_bug" ] PASTE_VISIBILITY = [ "public", "private", "unlisted" ] access_token = os.environ["SOURCEHUT_CLI_ACCESS_TOKEN"] username = "~"+os.environ["USER"] base_url = "" headers = {'User-Agent': "~thrrgilag srht-cli"} @click.group() @click.option('--user') @click.option('--token') @click.version_option(version='0.2.0') def cli(user, token): global access_token global username global headers if token is not None: access_token = token if user is not None: username = "~"+user headers['Authorization'] = "Bearer " + access_token @cli.group() def paste(): '''sourcehut paste hosting service''' global base_url base_url = "https://paste.sr.ht/api" @paste.command("list") def get_pastes(): '''List pastes''' url = base_url + f"/pastes" r = requests.get(url, headers=headers) if r.status_code == 200: output.show_pastes(r.json()['results']) else: print(r.status_code) print(r.text) @paste.command("show") @click.argument('sha') def show_paste(sha): '''Show details of a paste''' url = base_url + f"/pastes/{sha}" r = requests.get(url, headers=headers) if r.status_code == 200: paste = r.json() blobs = [] for b in paste['files']: burl = base_url + f"/blobs/{b['blob_id']}" br = requests.get(burl, headers=headers) if br.status_code == 200: item = { 'id': b['blob_id'], 'filename': b['filename'], 'content': br.json()['contents'] } blobs.append(item) paste['blobs'] = blobs output.show_paste(paste) else: print(r.status_code) print(r.text) @paste.command("blob") @click.argument('sha') def show_blob(sha): '''Show paste blob''' url = base_url + f"/blobs/{sha}" r = requests.get(url, headers=headers) if r.status_code == 200: #output.show_pastes(r.json()['results']) #print(json.dumps(r.json(), indent=4)) print(r.json()['contents']) else: print(r.status_code) print(r.text) @paste.command("submit") @click.argument('visibility', type=click.Choice(PASTE_VISIBILITY)) @click.option('--filename') def submit_ticket(visibility, filename): '''Submit a new paste''' content = click.edit() if content is None or len(content) < 1: print("nothing saved, aborting...") return url = base_url + f"/pastes" payload = {'visibility': visibility} blob = {'contents': content} if filename is not None: blob['filename'] = filename payload['files'] = [blob] r = requests.post(url, headers=headers, json=payload) print(r.status_code) print(r.text) @paste.command("delete") @click.argument('sha') def delete_paste(sha): '''Delete a paste''' url = base_url + f"/pastes/{sha}" r = requests.delete(url, headers=headers) print(r.status_code) print(r.text) @cli.group() def git(): '''sourcehut git hosting service''' global base_url base_url = "https://git.sr.ht/api" @git.command("repos") @click.option('--verbose', '-v', is_flag=True) def get_repos(verbose): '''List repositories''' url = base_url + f"/{username}/repos" r = requests.get(url, headers=headers) if r.status_code == 200: output.show_repos(r.json()['results'], verbose) else: print(r.status_code) print(r.text) @git.command("webhooks") @click.argument('repo') def get_git_webhooks(repo): '''List webhooks for a repository''' url = base_url + f"/{username}/repos/{repo}/webhooks" r = requests.get(url, headers=headers) if r.status_code == 200: print(json.dumps(r.json(), indent=4)) else: print(r.status_code) print(r.text) @git.command("add-webhook") @click.argument('repo') @click.argument('url') @click.option('--event', required=True, multiple=True) def add_git_webhook(repo, url, event): '''Add webhook to a repository''' payload = {'url': url, 'events': event} url = base_url + f"/{username}/repos/{repo}/webhooks" r = requests.post(url, headers=headers, json=payload) print(r.status_code) print(r.text) @git.command("del-webhook") @click.argument('repo') @click.argument('hookid') def del_git_webhook(repo, hookid): '''Delete webhook from a repository''' url = base_url + f"/{username}/repos/{repo}/webhooks/{hookid}" r = requests.delete(url, headers=headers) print(r.status_code) print(r.text) @cli.group() def todo(): '''sourcehut ticket tracking service''' global base_url base_url = "https://todo.sr.ht/api" @todo.command("tickets") @click.argument('tracker') @click.option('--closed', is_flag=True) def show_tickets(tracker, closed): '''List tickets on a tracker''' url = base_url + f"/user/{username}/trackers/{tracker}/tickets" r = requests.get(url, headers=headers) if r.status_code == 200: output.show_tickets(r.json()['results'], closed) else: print(r.status_code) print(r.text) @todo.command("submit") @click.argument('tracker') def submit_ticket(tracker): '''Submit new ticket on a tracker''' issue = click.edit() if issue is None: print("nothing saved, aborting...") return title, description = issue.split('\n', 1) description = description.lstrip() url = base_url + f"/user/{username}/trackers/{tracker}/tickets" payload = {'title': title, 'description': description} r = requests.post(url, headers=headers, json=payload) print(r.status_code) print(r.text) @todo.command() @click.argument('tracker') @click.argument('ticketid') @click.option('--status', type=click.Choice(TICKET_STATUS)) @click.option('--resolution', type=click.Choice(TICKET_RESOLUTION)) @click.option('--label', multiple=True) def comment(tracker, ticketid, status, resolution, label): '''Comment, label, update ticket status''' payload = {} comment = click.edit() if comment is not None and len(comment) > 0: payload["comment"] = comment if status is not None: payload["status"] = status if label is not None: payload["labels"] = label if resolution is not None: # The legacy API is a bit odd and only works to set the status and # resolution together to close or re-open. Status otherwise seems # irrelevant. if status != "reported": payload["status"] = "resolved" payload["resolution"] = resolution if len(payload) < 1: print("nothing saved, aborting...") return print(payload) url = base_url + f"/user/{username}/trackers/{tracker}/tickets/{ticketid}" r = requests.put(url, headers=headers, json=payload) print(r.status_code) print(r.text) @todo.command("show") @click.argument('tracker') @click.argument('ticketid') def show_ticket(tracker, ticketid): '''Show details of a ticket on a tracker''' url = base_url + f"/user/{username}/trackers/{tracker}/tickets/{ticketid}" r = requests.get(url, headers=headers) if r.status_code == 200: output.show_ticket(r.json()) else: print(r.status_code) print(r.text) return # fetch and display the events for the ticket as well url = base_url + f"/user/{username}/trackers/{tracker}/tickets/{ticketid}/events" r = requests.get(url, headers=headers) if r.status_code == 200: output.show_comments(r.json()['results']) else: print(r.status_code) print(r.text) @todo.command("trackers") @click.option('--verbose', '-v', is_flag=True) def get_trackers(verbose): '''List trackers''' url = base_url + f"/user/{username}/trackers" r = requests.get(url, headers=headers) if r.status_code == 200: output.show_trackers(r.json()['results'], verbose) else: print(r.status_code) print(r.text) @todo.command("labels") @click.argument('tracker') def get_labels(tracker): '''List labels for a tracker''' url = base_url + f"/user/{username}/trackers/{tracker}/labels" r = requests.get(url, headers=headers) if r.status_code == 200: output.show_labels(r.json()['results']) else: print(r.status_code) print(r.text) @todo.command("webhooks") @click.argument('tracker') def get_tracker_webhooks(tracker): '''List webhooks for a tracker''' url = base_url + f"/user/{username}/trackers/{tracker}/webhooks" r = requests.get(url, headers=headers) if r.status_code == 200: print(json.dumps(r.json(), indent=4)) else: print(r.status_code) print(r.text) @todo.command("del-webhook") @click.argument('tracker') @click.argument('hookid') def del_tacker_webhook(tracker, hookid): '''Delete webhook from a tracker''' url = base_url + f"/user/{username}/trackers/{tracker}/webhooks/{hookid}" r = requests.delete(url, headers=headers) print(r.status_code) print(r.text) @todo.command("add-webhook") @click.argument('tracker') @click.argument('url') @click.option('--event', required=True, multiple=True) def add_tacker_webhook(tracker, url, event): '''Add webhook to a tracker''' payload = {'url': url, 'events': event} url = base_url + f"/user/{username}/trackers/{tracker}/webhooks" r = requests.post(url, headers=headers, json=payload) print(r.status_code) print(r.text)