# [DEF:backend.src.services.git_service:Module] # # @SEMANTICS: git, service, gitpython, repository, version_control # @PURPOSE: Core Git logic using GitPython to manage dashboard repositories. # @LAYER: Service # @RELATION: INHERITS_FROM -> None # @RELATION: USED_BY -> src.api.routes.git # @RELATION: USED_BY -> src.plugins.git_plugin # # @INVARIANT: All Git operations must be performed on a valid local directory. import os import shutil import httpx from git import Repo, RemoteProgress from fastapi import HTTPException from typing import List, Optional from datetime import datetime from src.core.logger import logger, belief_scope from src.models.git import GitProvider # [DEF:GitService:Class] # @PURPOSE: Wrapper for GitPython operations with semantic logging and error handling. class GitService: """ Wrapper for GitPython operations. """ # [DEF:__init__:Function] # @PURPOSE: Initializes the GitService with a base path for repositories. # @PARAM: base_path (str) - Root directory for all Git clones. def __init__(self, base_path: str = "backend/git_repos"): with belief_scope("GitService.__init__"): self.base_path = base_path if not os.path.exists(self.base_path): os.makedirs(self.base_path) # [/DEF:__init__:Function] # [DEF:_get_repo_path:Function] # @PURPOSE: Resolves the local filesystem path for a dashboard's repository. # @PARAM: dashboard_id (int) # @RETURN: str def _get_repo_path(self, dashboard_id: int) -> str: return os.path.join(self.base_path, str(dashboard_id)) # [/DEF:_get_repo_path:Function] # [DEF:init_repo:Function] # @PURPOSE: Initialize or clone a repository for a dashboard. # @PARAM: dashboard_id (int) # @PARAM: remote_url (str) # @PARAM: pat (str) - Personal Access Token for authentication. # @RETURN: Repo - GitPython Repo object. def init_repo(self, dashboard_id: int, remote_url: str, pat: str) -> Repo: with belief_scope("GitService.init_repo"): repo_path = self._get_repo_path(dashboard_id) # Inject PAT into remote URL if needed if pat and "://" in remote_url: proto, rest = remote_url.split("://", 1) auth_url = f"{proto}://oauth2:{pat}@{rest}" else: auth_url = remote_url if os.path.exists(repo_path): logger.info(f"[init_repo][Action] Opening existing repo at {repo_path}") return Repo(repo_path) logger.info(f"[init_repo][Action] Cloning {remote_url} to {repo_path}") return Repo.clone_from(auth_url, repo_path) # [/DEF:init_repo:Function] # [DEF:get_repo:Function] # @PURPOSE: Get Repo object for a dashboard. # @PRE: Repository must exist on disk. # @RETURN: Repo def get_repo(self, dashboard_id: int) -> Repo: with belief_scope("GitService.get_repo"): repo_path = self._get_repo_path(dashboard_id) if not os.path.exists(repo_path): logger.error(f"[get_repo][Coherence:Failed] Repository for dashboard {dashboard_id} does not exist") raise HTTPException(status_code=404, detail=f"Repository for dashboard {dashboard_id} not found") try: return Repo(repo_path) except Exception as e: logger.error(f"[get_repo][Coherence:Failed] Failed to open repository at {repo_path}: {e}") raise HTTPException(status_code=500, detail="Failed to open local Git repository") # [/DEF:get_repo:Function] # [DEF:list_branches:Function] # @PURPOSE: List all branches for a dashboard's repository. # @RETURN: List[dict] def list_branches(self, dashboard_id: int) -> List[dict]: with belief_scope("GitService.list_branches"): repo = self.get_repo(dashboard_id) logger.info(f"[list_branches][Action] Listing branches for {dashboard_id}. Refs: {repo.refs}") branches = [] # Add existing refs for ref in repo.refs: try: # Strip prefixes for UI name = ref.name.replace('refs/heads/', '').replace('refs/remotes/origin/', '') # Avoid duplicates (e.g. local and remote with same name) if any(b['name'] == name for b in branches): continue branches.append({ "name": name, "commit_hash": ref.commit.hexsha if hasattr(ref, 'commit') else "0000000", "is_remote": ref.is_remote() if hasattr(ref, 'is_remote') else False, "last_updated": datetime.fromtimestamp(ref.commit.committed_date) if hasattr(ref, 'commit') else datetime.utcnow() }) except Exception as e: logger.warning(f"[list_branches][Action] Skipping ref {ref}: {e}") # Ensure the current active branch is in the list even if it has no commits or refs try: active_name = repo.active_branch.name if not any(b['name'] == active_name for b in branches): branches.append({ "name": active_name, "commit_hash": "0000000", "is_remote": False, "last_updated": datetime.utcnow() }) except Exception as e: logger.warning(f"[list_branches][Action] Could not determine active branch: {e}") # If everything else failed and list is still empty, add default if not branches: branches.append({ "name": "main", "commit_hash": "0000000", "is_remote": False, "last_updated": datetime.utcnow() }) return branches # [/DEF:list_branches:Function] # [DEF:create_branch:Function] # @PURPOSE: Create a new branch from an existing one. # @PARAM: name (str) - New branch name. # @PARAM: from_branch (str) - Source branch. def create_branch(self, dashboard_id: int, name: str, from_branch: str = "main"): with belief_scope("GitService.create_branch"): repo = self.get_repo(dashboard_id) logger.info(f"[create_branch][Action] Creating branch {name} from {from_branch}") # Handle empty repository case (no commits) if not repo.heads and not repo.remotes: logger.warning(f"[create_branch][Action] Repository is empty. Creating initial commit to enable branching.") readme_path = os.path.join(repo.working_dir, "README.md") if not os.path.exists(readme_path): with open(readme_path, "w") as f: f.write(f"# Dashboard {dashboard_id}\nGit repository for Superset dashboard integration.") repo.index.add(["README.md"]) repo.index.commit("Initial commit") # Verify source branch exists try: repo.commit(from_branch) except: logger.warning(f"[create_branch][Action] Source branch {from_branch} not found, using HEAD") from_branch = repo.head try: new_branch = repo.create_head(name, from_branch) return new_branch except Exception as e: logger.error(f"[create_branch][Coherence:Failed] {e}") raise # [/DEF:create_branch:Function] # [/DEF:create_branch:Function] # [DEF:checkout_branch:Function] # @PURPOSE: Switch to a specific branch. def checkout_branch(self, dashboard_id: int, name: str): with belief_scope("GitService.checkout_branch"): repo = self.get_repo(dashboard_id) logger.info(f"[checkout_branch][Action] Checking out branch {name}") repo.git.checkout(name) # [/DEF:checkout_branch:Function] # [DEF:commit_changes:Function] # @PURPOSE: Stage and commit changes. # @PARAM: message (str) - Commit message. # @PARAM: files (List[str]) - Optional list of specific files to stage. def commit_changes(self, dashboard_id: int, message: str, files: List[str] = None): with belief_scope("GitService.commit_changes"): repo = self.get_repo(dashboard_id) # Check if there are any changes to commit if not repo.is_dirty(untracked_files=True) and not files: logger.info(f"[commit_changes][Action] No changes to commit for dashboard {dashboard_id}") return if files: logger.info(f"[commit_changes][Action] Staging files: {files}") repo.index.add(files) else: logger.info("[commit_changes][Action] Staging all changes") repo.git.add(A=True) repo.index.commit(message) logger.info(f"[commit_changes][Coherence:OK] Committed changes with message: {message}") # [/DEF:commit_changes:Function] # [DEF:push_changes:Function] # @PURPOSE: Push local commits to remote. def push_changes(self, dashboard_id: int): with belief_scope("GitService.push_changes"): repo = self.get_repo(dashboard_id) # Ensure we have something to push if not repo.heads: logger.warning(f"[push_changes][Coherence:Failed] No local branches to push for dashboard {dashboard_id}") return try: origin = repo.remote(name='origin') except ValueError: logger.error(f"[push_changes][Coherence:Failed] Remote 'origin' not found for dashboard {dashboard_id}") raise HTTPException(status_code=400, detail="Remote 'origin' not configured") # Check if current branch has an upstream try: current_branch = repo.active_branch logger.info(f"[push_changes][Action] Pushing branch {current_branch.name} to origin") # Using a timeout for network operations push_info = origin.push(refspec=f'{current_branch.name}:{current_branch.name}') for info in push_info: if info.flags & info.ERROR: logger.error(f"[push_changes][Coherence:Failed] Error pushing ref {info.remote_ref_string}: {info.summary}") raise Exception(f"Git push error for {info.remote_ref_string}: {info.summary}") except Exception as e: logger.error(f"[push_changes][Coherence:Failed] Failed to push changes: {e}") raise HTTPException(status_code=500, detail=f"Git push failed: {str(e)}") # [/DEF:push_changes:Function] # [DEF:pull_changes:Function] # @PURPOSE: Pull changes from remote. def pull_changes(self, dashboard_id: int): with belief_scope("GitService.pull_changes"): repo = self.get_repo(dashboard_id) try: origin = repo.remote(name='origin') logger.info("[pull_changes][Action] Pulling changes from origin") fetch_info = origin.pull() for info in fetch_info: if info.flags & info.ERROR: logger.error(f"[pull_changes][Coherence:Failed] Error pulling ref {info.ref}: {info.note}") raise Exception(f"Git pull error for {info.ref}: {info.note}") except ValueError: logger.error(f"[pull_changes][Coherence:Failed] Remote 'origin' not found for dashboard {dashboard_id}") raise HTTPException(status_code=400, detail="Remote 'origin' not configured") except Exception as e: logger.error(f"[pull_changes][Coherence:Failed] Failed to pull changes: {e}") raise HTTPException(status_code=500, detail=f"Git pull failed: {str(e)}") # [/DEF:pull_changes:Function] # [DEF:get_status:Function] # @PURPOSE: Get current repository status (dirty files, untracked, etc.) # @RETURN: dict def get_status(self, dashboard_id: int) -> dict: with belief_scope("GitService.get_status"): repo = self.get_repo(dashboard_id) # Handle empty repository (no commits) has_commits = False try: repo.head.commit has_commits = True except (ValueError, Exception): has_commits = False return { "is_dirty": repo.is_dirty(untracked_files=True), "untracked_files": repo.untracked_files, "modified_files": [item.a_path for item in repo.index.diff(None)], "staged_files": [item.a_path for item in repo.index.diff("HEAD")] if has_commits else [], "current_branch": repo.active_branch.name } # [/DEF:get_status:Function] # [DEF:get_diff:Function] # @PURPOSE: Generate diff for a file or the whole repository. # @PARAM: file_path (str) - Optional specific file. # @PARAM: staged (bool) - Whether to show staged changes. # @RETURN: str def get_diff(self, dashboard_id: int, file_path: str = None, staged: bool = False) -> str: with belief_scope("GitService.get_diff"): repo = self.get_repo(dashboard_id) diff_args = [] if staged: diff_args.append("--staged") if file_path: return repo.git.diff(*diff_args, "--", file_path) return repo.git.diff(*diff_args) # [/DEF:get_diff:Function] # [DEF:get_commit_history:Function] # @PURPOSE: Retrieve commit history for a repository. # @PARAM: limit (int) - Max number of commits to return. # @RETURN: List[dict] def get_commit_history(self, dashboard_id: int, limit: int = 50) -> List[dict]: with belief_scope("GitService.get_commit_history"): repo = self.get_repo(dashboard_id) commits = [] try: # Check if there are any commits at all if not repo.heads and not repo.remotes: return [] for commit in repo.iter_commits(max_count=limit): commits.append({ "hash": commit.hexsha, "author": commit.author.name, "email": commit.author.email, "timestamp": datetime.fromtimestamp(commit.committed_date), "message": commit.message.strip(), "files_changed": list(commit.stats.files.keys()) }) except Exception as e: logger.warning(f"[get_commit_history][Action] Could not retrieve commit history for dashboard {dashboard_id}: {e}") return [] return commits # [/DEF:get_commit_history:Function] # [DEF:test_connection:Function] # @PURPOSE: Test connection to Git provider using PAT. # @PARAM: provider (GitProvider) # @PARAM: url (str) # @PARAM: pat (str) # @RETURN: bool async def test_connection(self, provider: GitProvider, url: str, pat: str) -> bool: with belief_scope("GitService.test_connection"): # Check for offline mode or local-only URLs if ".local" in url or "localhost" in url: logger.info("[test_connection][Action] Local/Offline mode detected for URL") return True if not url.startswith(('http://', 'https://')): logger.error(f"[test_connection][Coherence:Failed] Invalid URL protocol: {url}") return False if not pat or not pat.strip(): logger.error("[test_connection][Coherence:Failed] Git PAT is missing or empty") return False pat = pat.strip() try: async with httpx.AsyncClient() as client: if provider == GitProvider.GITHUB: headers = {"Authorization": f"token {pat}"} api_url = "https://api.github.com/user" if "github.com" in url else f"{url.rstrip('/')}/api/v3/user" resp = await client.get(api_url, headers=headers) elif provider == GitProvider.GITLAB: headers = {"PRIVATE-TOKEN": pat} api_url = f"{url.rstrip('/')}/api/v4/user" resp = await client.get(api_url, headers=headers) elif provider == GitProvider.GITEA: headers = {"Authorization": f"token {pat}"} api_url = f"{url.rstrip('/')}/api/v1/user" resp = await client.get(api_url, headers=headers) else: return False if resp.status_code != 200: logger.error(f"[test_connection][Coherence:Failed] Git connection test failed for {provider} at {api_url}. Status: {resp.status_code}") return resp.status_code == 200 except Exception as e: logger.error(f"[test_connection][Coherence:Failed] Error testing git connection: {e}") return False # [/DEF:test_connection:Function] # [/DEF:GitService:Class] # [/DEF:backend.src.services.git_service:Module]