package github import ( "bufio" "compress/gzip" "context" "encoding/json" "fmt" "log" "net/http" "os" "time" "github.com/jackc/pgx/v4" ) type GithubEvent struct { Id string `json:"id"` Type string `json:"type"` Repository GithubRepository `json:"repo"` Timestamp time.Time `json:"created_at"` Payload GithubEventPayload `json:"payload"` } type GithubRepository struct { Id int64 `json:"id"` Name string `json:"name"` } type GithubEventPayload struct { Action string `json:"action"` Release ReleaseEventPayload `json:"release"` Forkee ForkEventPayload `json:"forkee"` Issue IssueEventPayload `json:"issue"` PullRequest PullRequestEventPayload `json:"pull_request"` } type IssueEventPayload struct { Id int64 `json:"id"` Number int `json:"number"` } type PullRequestEventPayload struct { Id int64 `json:"id"` Number int `json:"number"` } type ForkEventPayload struct { Id int64 `json:"id"` Name string `json:"full_name"` } type ReleaseEventPayload struct { Id int64 `json:"id"` TagName string `json:"tag_name"` } func (evt *GithubEvent) String() string { switch { case evt.Type == "CreateEvent": return fmt.Sprintf("%s: Repository(%s)", evt.Timestamp.Format("2006-01-02"), evt.Repository.Name) case evt.Type == "WatchEvent": return fmt.Sprintf("%s: Star(%s)", evt.Timestamp.Format("2006-01-02"), evt.Repository.Name) case evt.Type == "PullRequestEvent": return fmt.Sprintf("%s: PR(%s, %d)", evt.Timestamp.Format("2006-01-02"), evt.Repository.Name, evt.Payload.PullRequest.Number) case evt.Type == "ForkEvent": return fmt.Sprintf("%s: Fork(%s)", evt.Timestamp.Format("2006-01-02"), evt.Repository.Name) case evt.Type == "IssuesEvent": return fmt.Sprintf("%s: Issue(%s, %d)", evt.Timestamp.Format("2006-01-02"), evt.Repository.Name, evt.Payload.Issue.Number) case evt.Type == "ReleaseEvent": return fmt.Sprintf("%s: Release(%s, %s)", evt.Timestamp.Format("2006-01-02"), evt.Repository.Name, evt.Payload.Release.TagName) } return "" } func (evt *GithubEvent) IsRelevant() bool { switch { case evt.Type == "CreateEvent": return true case evt.Type == "WatchEvent": return true case evt.Type == "PullRequestEvent": return evt.Payload.Action == "opened" case evt.Type == "ForkEvent": return true case evt.Type == "IssuesEvent": return evt.Payload.Action == "opened" case evt.Type == "ReleaseEvent": return evt.Payload.Action == "released" || evt.Payload.Action == "published" } return false } const ( CreateEventQuery string = ` INSERT INTO github_repository (id, name) VALUES ($1, $2) ON CONFLICT (id) DO NOTHING` WatchEventQuery string = ` INSERT INTO github_repository (id, name, stars) VALUES ($1, $2, 1) ON CONFLICT (id) DO UPDATE SET stars = github_repository.stars + 1` PullRequestEventQuery string = ` INSERT INTO github_repository (id, name, pull_requests) VALUES ($1, $2, 1) ON CONFLICT (id) DO UPDATE SET pull_requests = github_repository.pull_requests + 1` IssuesEventQuery string = ` INSERT INTO github_repository (id, name, issues) VALUES ($1, $2, 1) ON CONFLICT (id) DO UPDATE SET issues = github_repository.issues + 1` ForkEventQueryParent string = ` INSERT INTO github_repository (id, name, forks) VALUES ($1, $2, 1) ON CONFLICT (id) DO UPDATE SET forks = github_repository.forks + 1` ForkEventQueryChild string = ` INSERT INTO github_repository (id, name, forked_from) VALUES ($1, $2, $3) ON CONFLICT (id) DO NOTHING` ReleaseEventQueryRepo string = ` INSERT INTO github_repository (id, name) VALUES ($1, $2) ON CONFLICT (id) DO NOTHING` ReleaseEventQueryRelease string = ` INSERT INTO github_release (id, repo_id, timestamp, tag_name) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO UPDATE SET timestamp = $3, tag_name = $4` PersistStateQuery string = ` INSERT INTO github_event_package (id, last_event, done, last_error) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO UPDATE SET last_event = $2, done = $3, last_error = $4` GetStateQuery string = ` SELECT id, last_event, done, last_error FROM github_event_package WHERE id = $1` GetNextPackageQuery string = ` SELECT id, last_event, done, last_error FROM github_event_package WHERE NOT done ORDER BY id ASC LIMIT 1` GetLastPackageQuery string = ` SELECT id, last_event, done, last_error FROM github_event_package ORDER BY id DESC LIMIT 1` MaxTokenSize int = 10 * 1024 * 1024 BackoffDelay time.Duration = 60 * time.Second ) type GithubPackage struct { ID time.Time LastEvent string Done bool LastError string } func NewGithubPackage(db *pgx.Conn, id time.Time) (GithubPackage, error) { rows, err := db.Query(context.Background(), GetStateQuery, id) if err != nil { return GithubPackage{}, err } defer rows.Close() if rows.Next() { var pkg GithubPackage err = rows.Scan(&pkg.ID, &pkg.LastEvent, &pkg.Done, &pkg.LastError) if err != nil { return GithubPackage{}, err } return pkg, nil } return GithubPackage{ID: id}, nil } func NextGithubPackage(db *pgx.Conn) (GithubPackage, error) { rows, err := db.Query(context.Background(), GetNextPackageQuery) if err != nil { return GithubPackage{}, err } defer rows.Close() if rows.Next() { var pkg GithubPackage err = rows.Scan(&pkg.ID, &pkg.LastEvent, &pkg.Done, &pkg.LastError) if err != nil { return GithubPackage{}, err } return pkg, nil } rows.Close() rows, err = db.Query(context.Background(), GetLastPackageQuery) if err != nil { return GithubPackage{}, err } defer rows.Close() if rows.Next() { var pkg GithubPackage err = rows.Scan(&pkg.ID, &pkg.LastEvent, &pkg.Done, &pkg.LastError) if err != nil { return GithubPackage{}, err } return pkg.Next(), nil } return GithubPackage{ID: time.Date(2015, time.January, 1, 0, 0, 0, 0, time.UTC)}, nil } func (pkg *GithubPackage) Next() GithubPackage { var newPkg GithubPackage newPkg.ID = pkg.ID.Add(time.Hour) return newPkg } func (pkg *GithubPackage) Persist(db *pgx.Conn) error { rows, err := db.Query(context.Background(), PersistStateQuery, pkg.ID, pkg.LastEvent, pkg.Done, pkg.LastError) if err != nil { return err } rows.Close() return nil } func (pkg *GithubPackage) Import(db *pgx.Conn, stop chan os.Signal) error { err := pkg.BareImport(db, stop) if err != nil { pkg.LastError = err.Error() log.Printf("Import of package %s failed with error %s", pkg.String(), err) } else { log.Printf("Import of package %s completed successfully", pkg.String()) pkg.LastEvent = "" pkg.LastError = "" } e := pkg.Persist(db) if e != nil { return e } return err } func (pkg *GithubPackage) String() string { return fmt.Sprintf("%s-%d", pkg.ID.UTC().Format("2006-01-02"), pkg.ID.UTC().Hour()) } type InterruptedError struct { Signal os.Signal } func (err InterruptedError) Error() string { return fmt.Sprintf("Interrupted by %s", err.Signal.String()) } func (pkg *GithubPackage) BareImport(db *pgx.Conn, stop chan os.Signal) error { url := fmt.Sprintf("https://data.gharchive.org/%s.json.gz", pkg.String()) log.Printf("Fetching %s...", url) resp, err := http.Get(url) if err != nil { return err } if resp.StatusCode != 200 { return fmt.Errorf("Wrong status code: %d", resp.StatusCode) } gz, err := gzip.NewReader(resp.Body) if err != nil { return err } defer gz.Close() var buffer []byte = make([]byte, MaxTokenSize) scanner := bufio.NewScanner(gz) scanner.Buffer(buffer, MaxTokenSize) var skip bool = false if pkg.LastEvent != "" { log.Printf("Resuming import at %s...", pkg.LastEvent) skip = true } else { log.Printf("Started import of package %s...", pkg.String()) } for scanner.Scan() { var evt GithubEvent err := json.Unmarshal(scanner.Bytes(), &evt) if err != nil { return err } if skip { if pkg.LastEvent == evt.Id { skip = false log.Printf("Started import of package %s...", pkg.String()) } continue } if evt.IsRelevant() { tx, err := db.Begin(context.Background()) if err != nil { return err } defer tx.Rollback(context.Background()) switch { case evt.Type == "CreateEvent": rows, err := tx.Query(context.Background(), CreateEventQuery, evt.Repository.Id, evt.Repository.Name) if err != nil { return err } rows.Close() case evt.Type == "WatchEvent": rows, err := tx.Query(context.Background(), WatchEventQuery, evt.Repository.Id, evt.Repository.Name) if err != nil { return err } rows.Close() case evt.Type == "PullRequestEvent": rows, err := tx.Query(context.Background(), PullRequestEventQuery, evt.Repository.Id, evt.Repository.Name) if err != nil { return err } rows.Close() case evt.Type == "ForkEvent": rows, err := tx.Query(context.Background(), ForkEventQueryParent, evt.Repository.Id, evt.Repository.Name) if err != nil { return err } rows.Close() rows, err = tx.Query(context.Background(), ForkEventQueryChild, evt.Payload.Forkee.Id, evt.Payload.Forkee.Name, evt.Repository.Id) if err != nil { return err } rows.Close() case evt.Type == "IssuesEvent": rows, err := tx.Query(context.Background(), IssuesEventQuery, evt.Repository.Id, evt.Repository.Name) if err != nil { return err } rows.Close() case evt.Type == "ReleaseEvent": rows, err := tx.Query(context.Background(), ReleaseEventQueryRepo, evt.Repository.Id, evt.Repository.Name) if err != nil { return err } rows.Close() rows, err = tx.Query(context.Background(), ReleaseEventQueryRelease, evt.Payload.Release.Id, evt.Repository.Id, evt.Timestamp, evt.Payload.Release.TagName) if err != nil { return err } rows.Close() } err = tx.Commit(context.Background()) if err != nil { return err } } pkg.LastEvent = evt.Id select { case sig := <-stop: return &InterruptedError{Signal: sig} default: } } if err := scanner.Err(); err != nil { return err } pkg.Done = true return nil }